Skip to content

Commit

Permalink
Io camunda connector (#76)
Browse files Browse the repository at this point in the history
moved to io.camunda and Connector 0.3.0
  • Loading branch information
pierre-yves-monnet authored Nov 28, 2022
1 parent 8bb28db commit 82d2b21
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 76 deletions.
34 changes: 19 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,15 @@
<java.version>17</java.version>
<maven.compiler.target>${java.version}</maven.compiler.target>
<maven.compiler.source>${java.version}</maven.compiler.source>
<zeebe.version>8.1.0</zeebe.version>

<zeebe.version>8.1.4</zeebe.version>

<!-- 0.2.0 works fine -->
<connector-core.version>0.3.0</connector-core.version>

<connector-runtime.version>0.2.2</connector-runtime.version>
<connector-validation.version>0.2.2</connector-validation.version>

<junit.jupiter.version>5.9.1</junit.jupiter.version>
<opensagres.version>2.0.3</opensagres.version>
<spring.boot.version>2.7.4</spring.boot.version>
Expand Down Expand Up @@ -66,25 +74,21 @@
<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-core</artifactId>
<version>0.2.0</version>
</dependency>
<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-runtime-job-worker</artifactId>
<version>0.2.2</version>
<version>${connector-core.version}</version>
<scope>provided</scope>
</dependency>



<dependency>
<groupId>io.camunda.connector</groupId>
<artifactId>connector-validation</artifactId>
<version>0.2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/jakarta.validation/jakarta.validation-api -->
<dependency>
<groupId>jakarta.validation</groupId>
<artifactId>jakarta.validation-api</artifactId>
<version>3.0.2</version>
<version>${connector-validation.version}</version>
<scope>provided</scope>

</dependency>


<!-- JSON LocalDateTime -->
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
Expand Down Expand Up @@ -147,7 +151,7 @@
<dependency>
<groupId>io.zeebe</groupId>
<artifactId>zeebe-worker-java-testutils</artifactId>
<version>8.0.0</version>
<version>8.1.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/io/camunda/cherry/admin/RunnerInformation.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* ******************************************************************** */
/* */
/* WorkerInformation */
/* RunnerInformation */
/* */
/* Collect worker information from a worker. */
/* This class works as a facade. It's easy then to get the JSON */
Expand Down Expand Up @@ -110,7 +110,8 @@ public void setDisplayLogo(boolean displayLogo) {
* @return the list of errors
*/
public String getDefinitionErrors() {
return String.join(", ", runner.getDefinitionErrors());
return String.join(", ", runner.checkValidDefinition());

}

public enum TYPE_RUNNER {WORKER, CONNECTOR}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public RunnerInformation stopWorker(@RequestParam(name = "name") String runnerNa
RunnerInformation runnerInfo = RunnerInformation.getRunnerInformation(runner);
return completeRunnerInformation(runnerInfo, false, false, null);
} catch (CherryJobRunnerFactory.OperationException e) {
if (e.exceptionCode.equals(CherryJobRunnerFactory.WORKER_NOT_FOUND))
if (CherryJobRunnerFactory.RUNNER_NOT_FOUND.equals(e.getExceptionCode()))
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "WorkerName [" + runnerName + "] not found");
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "WorkerName [" + runnerName + "] error " + e);
}
Expand All @@ -138,7 +138,7 @@ public RunnerInformation startWorker(@RequestParam(name = "name") String runnerN
RunnerInformation runnerInfo = RunnerInformation.getRunnerInformation(runner);
return completeRunnerInformation(runnerInfo, false, false, null);
} catch (CherryJobRunnerFactory.OperationException e) {
if (e.exceptionCode.equals(CherryJobRunnerFactory.WORKER_NOT_FOUND))
if (CherryJobRunnerFactory.RUNNER_NOT_FOUND.equals(e.getExceptionCode()))
throw new ResponseStatusException(HttpStatus.NOT_FOUND, "WorkerName [" + runnerName + "] not found");
throw new ResponseStatusException(HttpStatus.INTERNAL_SERVER_ERROR, "WorkerName [" + runnerName + "] error " + e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,15 @@ public List<RunnerParameter> getInputParameters() {
return Collections.emptyList();
}

/**
* Create the list and give a class.
* If the Cherry input connector is created from a basic connector, give the Input connector.
* The Cherry will be able to verify the list againts the Input: all fields are declared? All RunnerParameters exists as a member in the class?
*/
public record InputParametersInfo (List<RunnerParameter> listRunners, Class inputClass){}

public InputParametersInfo getInputParametersInfo() {
return new InputParametersInfo(Collections.emptyList(), null);
}

}
62 changes: 47 additions & 15 deletions src/main/java/io/camunda/cherry/definition/AbstractRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@
import org.springframework.beans.factory.annotation.Autowired;

import java.io.File;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.*;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public abstract class AbstractRunner {

Expand Down Expand Up @@ -688,7 +690,7 @@ private boolean containsKeyInJob(String parameterName, final ActivatedJob activa
*
* @param parameterName parameter to get the value
* @param activatedJob activated job
* @return
* @return an object
*/
protected Object getValueFromJob(String parameterName, final ActivatedJob activatedJob) {
if (activatedJob.getVariablesAsMap().containsKey(parameterName))
Expand Down Expand Up @@ -739,17 +741,6 @@ public List<BpmnError> getListBpmnErrors() {
return listBpmnErrors;
}

/**
* Check parameters. If something is not correct in the definition, then throw an error
*
* @return a list of errors
*/
public List<String> getDefinitionErrors() {
List<String> listOfErrors = new ArrayList<>();
listOfErrors.addAll(checkListParameters(listInput));
listOfErrors.addAll(checkListParameters(listOutput));
return listOfErrors;
}
/**
* Return the list of variable to fetch if this is possible, else null.
* To calculate the list:
Expand Down Expand Up @@ -825,7 +816,7 @@ public String getDescription() {
* Image must be a string like
* "data:image/svg+xml,%3Csvg xmlns='http://www.w3.org/2000/svg' width='18' height='18.0' viewBox='0 0 18 18.0' %3E%3Cg id='XMLID_238_'%3E %3Cpath id='XMLID_239_' d='m 14.708846 10.342394 c -1.122852 0.0,-2.528071 0.195852,-2.987768 0.264774 C 9.818362 8.6202,9.277026 7.4907875,9.155265 7.189665 C 9.320285 6.765678,9.894426 5.155026,9.976007 3.0864196 C 10.016246 2.0507226,9.797459 1.2768387,9.325568 .... -0.00373,0.03951 0.00969,0.0425 0.030567 z'/%3E%3C/svg%3E";
*
* @return
* @return the logo
*/
public String getLogo() {
return logo;
Expand Down Expand Up @@ -856,10 +847,51 @@ private boolean isStringEmpty(String value) {
*
* @return
*/
public boolean isValidDefinition() {
return (getIdentification().isEmpty());
public List<String> checkValidDefinition() {
List<String> listOfErrors= new ArrayList<>();
if (getIdentification().isEmpty())
listOfErrors.add("No identification");

if (this instanceof AbstractConnector) {
AbstractConnectorInput.InputParametersInfo parameterInfo=((AbstractConnector) this).getAbstractConnectorInput().getInputParametersInfo();
if (parameterInfo!=null && ! parameterInfo.listRunners().isEmpty() && parameterInfo.inputClass()!=null)
listOfErrors.addAll(confrontParameterWithClass( parameterInfo.inputClass(), parameterInfo.listRunners()));
}

listOfErrors.addAll(checkListParameters(listInput));
listOfErrors.addAll(checkListParameters(listOutput));

return listOfErrors;
}

/**
* Confront a list of RunnerParameter with a class.
* @param clazz class to confront
* @param parameters list of Runner.
* @return empty is every thing is OK, else an analysis
*/
private List<String> confrontParameterWithClass(Class clazz, List<RunnerParameter> parameters) {
List<String> listOfErrors= new ArrayList<>();

Field[] fields = clazz.getDeclaredFields();
// All fields are part of parameters?
for (Field field: fields) {
long number = parameters.stream().filter(t -> t.getName().equals(field.getName())).count();
if (number != 1)
listOfErrors.add("Class Field[" + field.getName() + "] is not part of parameters");
}

// All parameters must be part of the fields
for (RunnerParameter parameter : parameters) {
if (parameter.getName().equals("*"))
continue;
long number = Stream.of(fields).filter(t -> t.getName().equals(parameter.getName())).count();
if (number != 1)
listOfErrors.add("Parameter[" + parameter.getName() + "] is not part of fields in the class");
}

return listOfErrors;
}


private List<String> checkListParameters(List<RunnerParameter> listParameters) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/io/camunda/cherry/ping/PingConnectorInput.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
import java.util.Arrays;
import java.util.List;

import jakarta.validation.constraints.NotEmpty;
import javax.validation.constraints.NotEmpty;


public class PingConnectorInput extends AbstractConnectorInput {

Expand Down
65 changes: 41 additions & 24 deletions src/main/java/io/camunda/cherry/runtime/CherryJobRunnerFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
/* ******************************************************************** */
package io.camunda.cherry.runtime;

import io.camunda.connector.runtime.jobworker.api.outbound.ConnectorJobHandler;
import io.camunda.connector.runtime.util.outbound.ConnectorJobHandler;
import io.camunda.zeebe.client.api.worker.JobWorker;
import io.camunda.zeebe.client.api.worker.JobWorkerBuilderStep1;
import io.camunda.cherry.definition.AbstractConnector;
Expand All @@ -26,7 +26,7 @@

@Service
public class CherryJobRunnerFactory {
public static final String WORKER_NOT_FOUND = "WorkerNotFound";
public static final String RUNNER_NOT_FOUND = "WorkerNotFound";
public static final String UNKNOWN_WORKER_CLASS = "UnknownWorkerClass";
public static final String WORKER_INVALID_DEFINITION = "WORKER_INVALID_DEFINITION";

Expand Down Expand Up @@ -56,9 +56,9 @@ public void startAll() {


for (AbstractRunner runner : listRunners) {
String errors = String.join(", ", runner.getDefinitionErrors());
if (!errors.isEmpty()) {
logger.error("Runner [" + runner.getIdentification() + "] can't start, errors " + errors);
List<String> listOfErrors = runner.checkValidDefinition();
if (!listOfErrors.isEmpty()) {
logger.error("Runner [" + runner.getIdentification() + "] can't start, errors " + String.join(", ",listOfErrors));
continue;
}

Expand All @@ -83,11 +83,9 @@ public void stopAll() {
if (running.runner != null) {
try {
stopRunner(running.runner.getIdentification());
} catch (OperationException e) {
logger.error("Error on worker [" + running.runner.getIdentification() + "]");

} catch (Exception e) {
logger.error("Error on worker [" + running.runner.getIdentification() + "]");
logger.error("Error on runner [" + running.runner.getIdentification() + "] : "+e);
}
}
}
Expand All @@ -104,45 +102,47 @@ public void stopAll() {
public boolean stopRunner(String runnerName) throws OperationException {
for (Running running : listRunnerRunning) {
if (running.runner().getIdentification().equals(runnerName)) {
closeJobWorker(running.containerJobWorker.jobWorker);
running.containerJobWorker.jobWorker = null;
closeJobWorker(running.containerJobWorker.getJobWorker());
running.containerJobWorker.setJobWorker(null);
return true;
}
}
throw new OperationException(WORKER_NOT_FOUND, "Worker not found");
throw new OperationException(RUNNER_NOT_FOUND, "Runner not found");
}

/**
* Start a runner
*
* @param runnerName name of the runner (connector/worker)
* @return true if the runner started
* @throws Exception
* @throws OperationException runner can't start
*/
public boolean startRunner(String runnerName) throws OperationException {
for (Running running : listRunnerRunning) {
if (running.runner().getIdentification().equals(runnerName)) {
if (!running.runner.isValidDefinition())
List<String> listOfErrors= running.runner.checkValidDefinition();
if (! listOfErrors.isEmpty())
throw new OperationException(WORKER_INVALID_DEFINITION, "Worker has error in the definition : "
+ String.join(";", running.runner.getDefinitionErrors()));
+ String.join(";", listOfErrors));

closeJobWorker(running.containerJobWorker.jobWorker);
running.containerJobWorker.jobWorker = null;
closeJobWorker(running.containerJobWorker.getJobWorker());
running.containerJobWorker.setJobWorker( null);
JobWorkerBuilderStep1.JobWorkerBuilderStep3 jobWorkerBuild = createJobWorker(running.runner);
running.containerJobWorker.jobWorker = jobWorkerBuild.open();
running.containerJobWorker.setJobWorker( jobWorkerBuild.open());
return true;
}
}
throw new OperationException(WORKER_NOT_FOUND, "Worker not found");
throw new OperationException(RUNNER_NOT_FOUND, "Worker not found");
}

public boolean isRunnerActive(String runnerName) throws OperationException {
for (Running running : listRunnerRunning) {
if (running.runner().getIdentification().equals(runnerName)) {
return running.containerJobWorker.jobWorker != null;

return running.containerJobWorker.getJobWorker() != null;
}
}
throw new OperationException(WORKER_NOT_FOUND, "Worker not found");
throw new OperationException(RUNNER_NOT_FOUND, "Worker not found");
}


Expand Down Expand Up @@ -202,11 +202,19 @@ else if (runner instanceof AbstractConnector abstractConnector)
* Not possible to restart a jobWorker: must be created again !
*/
private static class ContainerJobWorker {
public JobWorker jobWorker;
private JobWorker jobWorker;

public ContainerJobWorker(JobWorker jobWorker) {
this.jobWorker = jobWorker;
}

public JobWorker getJobWorker() {
return jobWorker;
}

public void setJobWorker(JobWorker jobWorker) {
this.jobWorker = jobWorker;
}
}

record Running(AbstractRunner runner, ContainerJobWorker containerJobWorker) {
Expand All @@ -216,13 +224,22 @@ record Running(AbstractRunner runner, ContainerJobWorker containerJobWorker) {
/**
* Declare an exception on an operation
*/
public class OperationException extends Exception {
public String exceptionCode;
public String explanation;
public static class OperationException extends Exception {
private final String exceptionCode;
private final String explanation;

OperationException(String exceptionCode, String explanation) {
this.exceptionCode = exceptionCode;
this.explanation = explanation;
}

public String getExceptionCode() {
return exceptionCode;
}

public String getExplanation() {
return explanation;
}

}
}
13 changes: 7 additions & 6 deletions src/main/resources/application.properties
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# use a OnPremise Zeebe engine
# zeebe.client.broker.gateway-address=127.0.0.1:26500
zeebe.client.broker.gateway-address=127.0.0.1:26500
# zeebe.client.broker.gateway-address=host.docker.internal:26500
# zeebe.client.security.plaintext=true
zeebe.client.security.plaintext=true

# use a cloud Zeebe engine
zeebe.client.cloud.region=bru-2
zeebe.client.cloud.clusterId=f867aa3d-5ee7-4324-96e8-21f557f104af
zeebe.client.cloud.clientId=si71NBAWtUlREmfKja5oQC3M.WsT~sHa
zeebe.client.cloud.clientSecret=pCTITndkvyNsaSkfJ-ji5w38vaP19QAXWURM3UVC1.J.eVyxa44uITFv3_c8iFi2
# zeebe.client.cloud.region=bru-2
# zeebe.client.cloud.clusterId=f867aa3d-5ee7-4324-96e8-21f557f104af
# zeebe.client.cloud.clientId=si71NBAWtUlREmfKja5oQC3M.WsT~sHa
# zeebe.client.cloud.clientSecret=pCTITndkvyNsaSkfJ-ji5w38vaP19QAXWURM3UVC1.J.eVyxa44uITFv3_c8iFi2



zeebe.client.worker.maxJobsActive=32
Expand Down
Loading

0 comments on commit 82d2b21

Please sign in to comment.