Skip to content

Commit

Permalink
Merge pull request #46 from JonatanMartens/development
Browse files Browse the repository at this point in the history
v2.1.0
  • Loading branch information
JonatanMartens authored Sep 29, 2020
2 parents 22fb2f5 + 8fda904 commit 4202906
Show file tree
Hide file tree
Showing 28 changed files with 284 additions and 134 deletions.
3 changes: 2 additions & 1 deletion pyzeebe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
from pyzeebe import exceptions
from pyzeebe.client.client import ZeebeClient
from pyzeebe.common import exceptions
from pyzeebe.credentials.camunda_cloud_credentials import CamundaCloudCredentials
from pyzeebe.credentials.oauth_credentials import OAuthCredentials
from pyzeebe.job.job import Job
from pyzeebe.job.job_status import JobStatus
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task_decorator import TaskDecorator
from pyzeebe.worker.task_router import ZeebeTaskRouter
Expand Down
Empty file removed pyzeebe/common/__init__.py
Empty file.
87 changes: 0 additions & 87 deletions pyzeebe/common/exceptions.py

This file was deleted.

5 changes: 5 additions & 0 deletions pyzeebe/exceptions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .job_exceptions import *
from .message_exceptions import *
from .pyzeebe_exceptions import *
from .workflow_exceptions import *
from .zeebe_exceptions import *
28 changes: 28 additions & 0 deletions pyzeebe/exceptions/job_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException


class ActivateJobsRequestInvalid(PyZeebeException):
def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int):
msg = "Failed to activate jobs. Reasons:"
if task_type == "" or task_type is None:
msg = msg + "task_type is empty, "
if worker == "" or task_type is None:
msg = msg + "worker is empty, "
if timeout < 1:
msg = msg + "job timeout is smaller than 0ms, "
if max_jobs_to_activate < 1:
msg = msg + "max_jobs_to_activate is smaller than 0ms, "

super().__init__(msg)


class JobAlreadyDeactivated(PyZeebeException):
def __init__(self, job_key: int):
super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)")
self.job_key = job_key


class JobNotFound(PyZeebeException):
def __init__(self, job_key: int):
super().__init__(f"Job {job_key} not found")
self.job_key = job_key
5 changes: 5 additions & 0 deletions pyzeebe/exceptions/message_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException


class MessageAlreadyExists(PyZeebeException):
pass
16 changes: 16 additions & 0 deletions pyzeebe/exceptions/pyzeebe_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
class PyZeebeException(Exception):
pass


class TaskNotFound(PyZeebeException):
pass


class NoVariableNameGiven(PyZeebeException):
def __init__(self, task_type: str):
super().__init__(f"No variable name given for single_value task {task_type}")
self.task_type = task_type


class NoZeebeAdapter(PyZeebeException):
pass
29 changes: 29 additions & 0 deletions pyzeebe/exceptions/workflow_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException


class WorkflowNotFound(PyZeebeException):
def __init__(self, bpmn_process_id: str, version: int):
super().__init__(
f"Workflow definition: {bpmn_process_id} with {version} was not found")
self.bpmn_process_id = bpmn_process_id
self.version = version


class WorkflowInstanceNotFound(PyZeebeException):
def __init__(self, workflow_instance_key: int):
super().__init__(f"Workflow instance key: {workflow_instance_key} was not found")
self.workflow_instance_key = workflow_instance_key


class WorkflowHasNoStartEvent(PyZeebeException):
def __init__(self, bpmn_process_id: str):
super().__init__(f"Workflow {bpmn_process_id} has no start event that can be called manually")
self.bpmn_process_id = bpmn_process_id


class WorkflowInvalid(PyZeebeException):
pass


class InvalidJSON(PyZeebeException):
pass
13 changes: 13 additions & 0 deletions pyzeebe/exceptions/zeebe_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pyzeebe.exceptions.pyzeebe_exceptions import PyZeebeException


class ZeebeBackPressure(PyZeebeException):
pass


class ZeebeGatewayUnavailable(PyZeebeException):
pass


class ZeebeInternalError(PyZeebeException):
pass
2 changes: 1 addition & 1 deletion pyzeebe/grpc_internals/zeebe_adapter_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

import grpc

from pyzeebe.common.exceptions import ZeebeInternalError, ZeebeBackPressure, ZeebeGatewayUnavailable
from pyzeebe.credentials.base_credentials import BaseCredentials
from pyzeebe.exceptions import ZeebeBackPressure, ZeebeGatewayUnavailable, ZeebeInternalError
from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayStub


Expand Down
7 changes: 3 additions & 4 deletions pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

import grpc

from pyzeebe.common.exceptions import JobNotFound, JobAlreadyDeactivated, ActivateJobsRequestInvalid
from pyzeebe.exceptions import ActivateJobsRequestInvalid, JobAlreadyDeactivated, JobNotFound
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.grpc_internals.zeebe_pb2 import (ActivateJobsRequest, CompleteJobRequest,
CompleteJobResponse, FailJobRequest, FailJobResponse, ThrowErrorRequest,
ThrowErrorResponse)
from pyzeebe.grpc_internals.zeebe_pb2 import ActivateJobsRequest, CompleteJobRequest, CompleteJobResponse, \
FailJobRequest, FailJobResponse, ThrowErrorRequest, ThrowErrorResponse
from pyzeebe.job.job import Job


Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/grpc_internals/zeebe_message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import grpc

from pyzeebe.common.exceptions import MessageAlreadyExists
from pyzeebe.exceptions import MessageAlreadyExists
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.grpc_internals.zeebe_pb2 import PublishMessageRequest, PublishMessageResponse

Expand Down
9 changes: 4 additions & 5 deletions pyzeebe/grpc_internals/zeebe_workflow_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@

import grpc

from pyzeebe.common.exceptions import (WorkflowNotFound, WorkflowInstanceNotFound, WorkflowInvalid,
WorkflowHasNoStartEvent, InvalidJSON)
from pyzeebe.exceptions import InvalidJSON, WorkflowNotFound, WorkflowInstanceNotFound, WorkflowHasNoStartEvent, \
WorkflowInvalid
from pyzeebe.grpc_internals.zeebe_adapter_base import ZeebeAdapterBase
from pyzeebe.grpc_internals.zeebe_pb2 import (CreateWorkflowInstanceRequest, CreateWorkflowInstanceWithResultRequest,
CancelWorkflowInstanceRequest, WorkflowRequestObject,
DeployWorkflowRequest, DeployWorkflowResponse)
from pyzeebe.grpc_internals.zeebe_pb2 import CreateWorkflowInstanceRequest, CreateWorkflowInstanceWithResultRequest, \
CancelWorkflowInstanceRequest, WorkflowRequestObject, DeployWorkflowRequest, DeployWorkflowResponse


class ZeebeWorkflowAdapter(ZeebeAdapterBase):
Expand Down
2 changes: 1 addition & 1 deletion pyzeebe/job/job.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from typing import Dict

from pyzeebe.common.exceptions import NoZeebeAdapter
from pyzeebe.exceptions import NoZeebeAdapter
from pyzeebe.job.job_status import JobStatus


Expand Down
35 changes: 27 additions & 8 deletions pyzeebe/worker/task_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
from abc import abstractmethod
from typing import Tuple, List, Callable

from pyzeebe.common.exceptions import TaskNotFound, NoVariableNameGiven
from pyzeebe.decorators.zeebe_decorator_base import ZeebeDecoratorBase
from pyzeebe.exceptions import NoVariableNameGiven, TaskNotFound
from pyzeebe.job.job import Job
from pyzeebe.task.exception_handler import ExceptionHandler
from pyzeebe.task.task import Task
Expand All @@ -26,33 +26,42 @@ def __init__(self, before: List[TaskDecorator] = None, after: List[TaskDecorator
self.tasks: List[Task] = []

def task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler,
variables_to_fetch: List[str] = None, timeout: int = 10000, max_jobs_to_activate: int = 32,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None, single_value: bool = False,
variable_name: str = None):
"""Decorator to create a task
single_value (bool): If the function returns a single value (int, string, list) and not a dictionary set this to
True.
True. Default: False
variable_name (str): If single_value then this will be the variable name given to zeebe:
{ <variable_name>: <function_return_value> }
timeout (int): Maximum duration of the task in milliseconds. If the timeout is surpasses Zeebe will give up
on the job and retry it. Default: 10000
max_jobs_to_activate (int): Maximum jobs the worker will execute in parallel (of this task). Default: 32
"""
if single_value and not variable_name:
raise NoVariableNameGiven(task_type=task_type)

elif single_value and variable_name:
return self._non_dict_task(task_type=task_type, variable_name=variable_name,
exception_handler=exception_handler, before=before, after=after)
return self._non_dict_task(task_type=task_type, variable_name=variable_name, timeout=timeout,
max_jobs_to_activate=max_jobs_to_activate, exception_handler=exception_handler,
before=before, after=after, variables_to_fetch=variables_to_fetch)

else:
return self._dict_task(task_type=task_type, exception_handler=exception_handler, before=before, after=after)
return self._dict_task(task_type=task_type, exception_handler=exception_handler, before=before, after=after,
timeout=timeout, max_jobs_to_activate=max_jobs_to_activate,
variables_to_fetch=variables_to_fetch)

@abstractmethod
def _dict_task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None):
timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None):
raise NotImplemented()

@abstractmethod
def _non_dict_task(self, task_type: str, variable_name: str,
exception_handler: ExceptionHandler = default_exception_handler,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None):
exception_handler: ExceptionHandler = default_exception_handler, timeout: int = 10000,
max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None):
raise NotImplemented()

@staticmethod
Expand All @@ -62,6 +71,16 @@ def inner_fn(*args, **kwargs):

return inner_fn

@staticmethod
def _get_parameters_from_function(fn: Callable) -> List[str]:
parameters = fn.__code__.co_varnames
if "args" in parameters:
return []
elif "kwargs" in parameters:
return []
else:
return list(parameters)

def remove_task(self, task_type: str) -> Task:
task_index = self._get_task_index(task_type)
return self.tasks.pop(task_index)
Expand Down
28 changes: 21 additions & 7 deletions pyzeebe/worker/task_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,47 @@

class ZeebeTaskRouter(ZeebeTaskHandler):
def _dict_task(self, task_type: str, exception_handler: ExceptionHandler = default_exception_handler,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None):
timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None):
def wrapper(fn: Callable[..., Dict]):
nonlocal variables_to_fetch
if not variables_to_fetch:
variables_to_fetch = self._get_parameters_from_function(fn)

task = self._create_task(task_type=task_type, task_handler=fn, exception_handler=exception_handler,
before=before, after=after)
timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, before=before,
after=after, variables_to_fetch=variables_to_fetch)

self.tasks.append(task)
return fn

return wrapper

def _non_dict_task(self, task_type: str, variable_name: str,
exception_handler: ExceptionHandler = default_exception_handler,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None):
exception_handler: ExceptionHandler = default_exception_handler, timeout: int = 10000,
max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None):
def wrapper(fn: Callable[..., Dict]):
nonlocal variables_to_fetch
if not variables_to_fetch:
variables_to_fetch = self._get_parameters_from_function(fn)

dict_fn = self._single_value_function_to_dict(variable_name=variable_name, fn=fn)

task = self._create_task(task_type=task_type, task_handler=dict_fn, exception_handler=exception_handler,
before=before, after=after)
timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, before=before,
after=after, variables_to_fetch=variables_to_fetch)

self.tasks.append(task)
return fn

return wrapper

def _create_task(self, task_type: str, task_handler: Callable, exception_handler: ExceptionHandler,
before: List[TaskDecorator] = None, after: List[TaskDecorator] = None) -> Task:
task = Task(task_type=task_type, task_handler=task_handler, exception_handler=exception_handler)
timeout: int = 10000, max_jobs_to_activate: int = 32, before: List[TaskDecorator] = None,
after: List[TaskDecorator] = None, variables_to_fetch: List[str] = None) -> Task:
task = Task(task_type=task_type, task_handler=task_handler, exception_handler=exception_handler,
timeout=timeout, max_jobs_to_activate=max_jobs_to_activate, variables_to_fetch=variables_to_fetch)
return self._add_decorators_to_task(task, before or [], after or [])

def _add_decorators_to_task(self, task: Task, before: List[TaskDecorator],
Expand Down
Loading

0 comments on commit 4202906

Please sign in to comment.