Skip to content

Commit

Permalink
Change the multithreading during createProcessInstance
Browse files Browse the repository at this point in the history
  • Loading branch information
pierre-yves-monnet committed Aug 16, 2024
1 parent 1168d5d commit 8170798
Show file tree
Hide file tree
Showing 16 changed files with 189 additions and 50 deletions.
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -344,23 +344,23 @@ mvn clean install
````
Now, create a docker image
````
docker build -t pierre-yves-monnet/processautomator:1.5.2 .
docker build -t pierre-yves-monnet/processautomator:1.6.0 .
````
Push the image to the Camunda hub (you must be login first to the docker registry)
````
docker tag pierre-yves-monnet/processautomator:1.5.2 ghcr.io/camunda-community-hub/process-execution-automator:1.5.2
docker push ghcr.io/camunda-community-hub/process-execution-automator:1.5.2
docker tag pierre-yves-monnet/processautomator:1.6.0 ghcr.io/camunda-community-hub/process-execution-automator:1.6.0
docker push ghcr.io/camunda-community-hub/process-execution-automator:1.6.0
````
Tag as the latest:
````
docker tag pierre-yves-monnet/processautomator:1.5.2 ghcr.io/camunda-community-hub/process-execution-automator:latest
docker tag pierre-yves-monnet/processautomator:1.6.0 ghcr.io/camunda-community-hub/process-execution-automator:latest
docker push ghcr.io/camunda-community-hub/process-execution-automator:latest
````
Expand Down
124 changes: 102 additions & 22 deletions doc/scenarioreference/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -134,21 +134,35 @@ The parent attribute is "executions"
|------------------------|-----------------------------------------------------------------------------------------------------------|---------------------------------|
| Name | Name of execution | "name": "This is the first run" |
| policy | "STOPATFIRSTERROR" or "CONTINUE": In case of an error, what is the next move. Default is STOPATFIRSTERROR | "policy": "STOPATFIRSTERROR" |
| numberProcessInstances | Number of process instances to create. Each process instance follows steps. | "numberProcessInstances": 45 |
| Number of threads | Number of threads to execute in parallel. Default is 1. | "numberOfThreads": 5 |
| execution | If false, the execution does not start. If not present, the default value is TRUE. | "execution" : false |


Then, the execution contains a list of steps.


## STARTEVENT step

Start a new process instance.

| Parameter | Explanation | Example |
|-----------|-------------------------------|----------------------------------|
| name | name of the step, optional | "name": "Happy path start event" |
| type | Specify the type (STARTEVENT) | "type": "STARTEVENT" |
| taskId | Activity ID of start event | "activityId": "StartEvent_1" |
| Parameter | Explanation | Example |
|--------------------|-----------------------------------------------------------------|-------------------------------------------------------------------------------------|
| name | name of the step, optional | "name": "Happy path start event" |
| type | Specify the type (STARTEVENT) | "type": "STARTEVENT" |
| taskId | Activity ID of start event | "activityId": "StartEvent_1" |
| numberOfExecutions | Number of execution: number of process instance to create | "numberOfExecutions" : 300 |
| frequency | Frequence to create the <numberOfExecution>. ISO format (PT10S) | "frequency": "PT30S" |
| nbThreads | Number of threads to execute the creation in parallel (1) | "nbThreads": 30 |
| variables | List of variables (JSON file) to update | "variables": {"amount": 450, "account": "myBankAccount", "colors": ["blue","red"]} |
| variablesOperation | List of variables, but the value is an operation | |

(1) see multithreading section

The startup wake up every <frequency> time. Then, it created the <numberOfExecution> process instances during the period.
if after the period it can't create all the process instance, then it stop, and a warning is sent.
For example, with a frequency of 30 S and a number of execution of 300, two scenario:
* it wake up, and create the 300 PI in 5 second. So, it waits 25 seconds, wake up again and create 300 new PI
* it wake up. In 30 seconds, it creates only 240 PI. So, it stop the creation, send an error. Because the period is finish and a new one start, it start create again a new batch of 300.



Example

Expand Down Expand Up @@ -194,19 +208,21 @@ certain position, you may want to simulate the worker. Then, the Process-Aautoma
service task. The real worker shouldbe deactivated then. If the service task is not found, then the
scenario will have an error.

| Parameter | Explanation | Example |
|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------------------------------------------------------|
| name | name of the step, optional | "name": "get Score" |
| type | Specify the type (SERVICETASK) | "type": "SERVICETASK" |
| delay | Deplay to wait before looking for the task, in ISO 8601 | "delay" : "PT0.1S" waits 100 ms |
| waitingTime | Wait for maximum this time before returning an error. Process-Automator queries the engine every 500 ms until this delay. Default value is 5 minutes | "waitingTime" : "PT10S" |
| taskId | Activity ID to query | "activityId": "review" |
| topic | Topic to search the task (mandatory in C8) | "topic" : "get-score" |
| streamEnqbled | Specify if the worker use the streamEnabled function . Default is true. | "streamEnabled: true |
| variables | List of variables (JSON file) to update | "variables": {"amount": 450, "account": "myBankAccount", "colors": ["blue","red"]} |
| variablesOperation | List of variables, but the value is an operation | |
| modeExecution | Implementation: options are CLASSICAL, THREAD, THREADTOKEN. Default is CLASSICAL | "modeExecution" : "CLASSICAL" |
| numberOfExecutions | Number of execution, the task may be multi-instance. Default is 1 | "numberOfExecutions" : 3 |
| Parameter | Explanation | Example |
|--------------------|------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------|
| name | name of the step, optional | "name": "get Score" |
| type | Specify the type (SERVICETASK) | "type": "SERVICETASK" |
| delay | Deplay to wait before looking for the task, in ISO 8601 | "delay" : "PT0.1S" waits 100 ms |
| waitingTime | Wait for maximum this time before returning an error. Process-Automator queries the engine every 500 ms until this delay. Default value is 5 minutes | "waitingTime" : "PT10S" |
| taskId | Activity ID to query | "activityId": "review" |
| topic | Topic to search the task (mandatory in C8) | "topic" : "get-score" |
| streamEnqbled | Specify if the worker use the streamEnabled function . Default is true. | "streamEnabled: true |
| variables | List of variables (JSON file) to update | "variables": {"amount": 450, "account": "myBankAccount", "colors": ["blue","red"]} |
| variablesOperation | List of variables, but the value is an operation | |
| modeExecution | Implementation: options are CLASSICAL, THREAD, THREADTOKEN. Default is CLASSICAL | "modeExecution" : "CLASSICAL" |
| nbTokens | Number of token (modeExecution==THREADTOKEN).Default is 1 | "nbTokens" : 200 |

See the Multithreading section

There is different implementation for the worker. Choose the one you will use for the simulation.

Expand All @@ -231,7 +247,7 @@ just send 300 requests and wait for the return.

To control the number of threads working on the worker and to get maximum efficiency, this
implementation can be used. This is the same implementation as before, but a token acquisition is
added. To start the thread, the
added. The number of token is setup in <numberOfExecutions> value

## ENDEVENT step

Expand Down Expand Up @@ -360,4 +376,68 @@ UserTaskThreshold(Activity_DiscoverySeedExtraction_TheEnd,7)
````


# multi threading

To send more request on server, it's possible to scale the different topic

## Start event
During the start event operation, process instance are created. They are created per period.
Parameters are:

| Parameter | Explanation | Example |
|--------------------|-----------------------------------------------------------------|-------------------------------------------------------------------------------------|
| numberOfExecutions | Number of execution: number of process instance to create | "numberOfExecutions" : 300 |
| frequency | Frequency to create the <numberOfExecution>. ISO format (PT10S) | "frequency": "PT30S" |
| nbThreads | Number of threads to execute the creation in parallel (1) | "nbThreads": 30 |

The frequency is the period to create the process instance. Let's use a frequency of 30 Seconds. Each 30 seconds, it will create numberOfExecutions process instances.
If it can't create this number, it will send a error.
To access this number, it's possible to multithreads the creation. This is the "nbWorkers" parameters. A threadPool is created with this value, and the numberOfExecutions creation is sent to this thread pool.

The numberOfWorkers is part of the scenario.
It's possible to override this value via the application.yaml
```yaml
automator:
startevent:
nbThreads: 45
```
or by an environment variable in a kubernetes deployment
```yaml

env:
- name: JAVA_TOOL_OPTIONS
value: >-
-Dautomator.startevent.nbThreads=200
```
## Workers
A workers can be multi thread by two different mechanism: the asynchrounous execution and the multi threading execution
**asynchronous execution**
it consist to not execute the treatment in the handle() or execute() method. This is done via a ThreadToken or an Asynchrounous mode.
**Worker thread**
This section is only for Camunda 8. Camunda 7 have only one thread per worker.
The number of threads for a worker is set at the Zeebe connection, not worker per worker.
These parameter are
```yaml
workerExecutionThreads: 100
workerMaxJobsActive: 100
```
It can be overridden by a environment variable in the Kubernetes file
```yaml

env:
- name: JAVA_TOOL_OPTIONS
value: >-
-Dautomator.startup.serverName=zeebeCloud
-Dautomator.servers.camunda8.name=zeebeCloud
-Dautomator.servers.camunda8.workerExecutionThreads=100
```
By this method:
* you set up the server name (`automator.startup.serverName`) to use for the Zeebe Connection to `zeebeCloud`
* then, you qualify the camunda8 variable (`automator.servers.camunda8.name`) with this name. The Zeebe connection can be configured via the `automator.servers.camunda8`variables
* the number of threads are set to 100 via the `automator.servers.camunda8.workerExecutionThreads` variable.

2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<groupId>org.camunda.community.automator</groupId>
<artifactId>process-execution-automator</artifactId>

<version>1.5.2</version>
<version>1.6.0</version>
<!-- Change the banner.txt version -->
<!-- 1.5 Change OperateClient / TaskList Library -->

Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/camunda/automator/AutomatorCLI.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ public void run(String[] args) {
.setUserTask(configurationStartup.isPolicyExecutionUserTask())
.setWarmingUp(configurationStartup.isPolicyExecutionWarmingUp())
.setDeploymentProcess(configurationStartup.isPolicyDeployProcess())
.setDeepTracking(configurationStartup.deepTracking());
.setDeepTracking(configurationStartup.deepTracking())
.setStartEventNbThreads(configurationStartup.getStartEventNbThreads());
List<String> filterService = configurationStartup.getFilterService();
if (filterService != null) {
runParameters.setFilterExecutionServiceTask(filterService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class BpmnEngineList {
public static final String CONF_WORKER_MAX_JOBS_ACTIVE = "workerMaxJobsActive";
public static final String CONF_WORKER_EXECUTION_THREADS = "workerExecutionThreads";
public static final String CONF_TASK_LIST_URL = "taskListUrl";
public static final String CONF_TASK_LIST_USER = "taskListUserName";
public static final String CONF_TASK_LIST_USER_NAME = "taskListUserName";
public static final String CONF_TASK_LIST_PASSWORD = "taskListUserPassword";
public static final String CONF_TASK_LIST_CLIENT_ID = "taskListClientId";
public static final String CONF_TASK_LIST_CLIENT_SECRET = "taskListClientSecret";
Expand Down Expand Up @@ -226,7 +226,7 @@ private List<BpmnServerDefinition> getFromServersList() throws AutomatorExceptio
bpmnServerDefinition.operateAudience = getString(CONF_OPERATE_AUDIENCE, serverMap, null, contextLog, false);

bpmnServerDefinition.taskListUrl = getString(CONF_TASK_LIST_URL, serverMap, null, contextLog, false);
bpmnServerDefinition.taskListUserName = getString(CONF_TASK_LIST_USER, serverMap, null, contextLog, false);
bpmnServerDefinition.taskListUserName = getString(CONF_TASK_LIST_USER_NAME, serverMap, null, contextLog, false);
bpmnServerDefinition.taskListUserPassword = getString(CONF_TASK_LIST_PASSWORD, serverMap, null, contextLog,
false);
bpmnServerDefinition.taskListClientId = getString(CONF_TASK_LIST_CLIENT_ID, serverMap, null, contextLog, false);
Expand Down Expand Up @@ -370,6 +370,8 @@ private List<BpmnServerDefinition> getFromServerConfiguration() {
camunda8.operateUserName = configurationServersEngine.zeebeOperateUserName;
camunda8.operateUserPassword = configurationServersEngine.zeebeOperateUserPassword;
camunda8.taskListUrl = configurationServersEngine.zeebeTaskListUrl;
camunda8.taskListUserName = configurationServersEngine.zeebeTaskListUserName;
camunda8.taskListUserPassword= configurationServersEngine.zeebeTaskListUserPassword;
list.add(camunda8);
logger.info(
"Configuration: Camunda8 Name[{}] zeebeGateway[{}] MaxJobsActive[{}] WorkerThreads[{}] " + "OperateURL[{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ public class ConfigurationServersEngine {
public String zeebeOperateUserName;
@Value("${automator.servers.camunda8.operateUserPassword:''}")
public String zeebeOperateUserPassword;

@Value("${automator.servers.camunda8.taskListUrl:''}")
public String zeebeTaskListUrl;
@Value("${automator.servers.camunda8.taskListUserName:''}")
public String zeebeTaskListUserName;
@Value("${automator.servers.camunda8.taskListUserPassword:''}")
public String zeebeTaskListUserPassword;

@Value("${automator.servers.camunda8.workerExecutionThreads:''}")
public String zeebeWorkerExecutionThreads;
@Value("${automator.servers.camunda8.workerMaxJobsActive:''}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ public class ConfigurationStartup {
@Value("${automator.startup.policyExecution:DEPLOYPROCESS|WARMINGUP|CREATION|SERVICETASK|USERTASK}")
public String policyExecution;

@Value("${automator.startEvent.nbThreads:#{null}}")
public Integer startEventNbThreads;

/**
* it may be necessary to wait the other component to warm up
*/
Expand Down Expand Up @@ -91,6 +94,9 @@ public boolean isPolicyDeployProcess() {
return policyExtended.contains("|DEPLOYPROCESS|");
}

public Integer getStartEventNbThreads() {
return startEventNbThreads;
}
public List<String> getScenarioFileAtStartup() {
return recalibrateAfterSplit(scenarioFileAtStartup);
}
Expand Down
Loading

0 comments on commit 8170798

Please sign in to comment.