Skip to content

Commit

Permalink
Merge pull request #18 from JonatanMartens/development
Browse files Browse the repository at this point in the history
v1.1.0
  • Loading branch information
JonatanMartens authored Sep 11, 2020
2 parents df4031a + 3ad877b commit 8244e5d
Show file tree
Hide file tree
Showing 23 changed files with 874 additions and 134 deletions.
12 changes: 3 additions & 9 deletions .github/workflows/test-python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,17 @@ jobs:
strategy:
matrix:
python-version: [3.5, 3.6, 3.7, 3.8]


container: python:${{ matrix.python-version }}
steps:
- uses: actions/checkout@v2
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v2
with:
python-version: ${{ matrix.python-version }}
- name: Install pipenv
uses: dschep/install-pipenv-action@v1

- name: Install dependencies
run: |
pip install pipenv
pipenv install --dev
- name: Test with pytest
run: |
pipenv run coverage run --source=. -m py.test
pipenv run coverage run --source=. -m py.test pyzeebe
- name: Upload to coveralls
run: |
pipenv run coveralls
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/test-zeebe-integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
name: Integration test pyzeebe

on:
push:
branches: [ master, development, feature/*, bugfix/* ]
pull_request:
branches: [ master, development, feature/*, bugfix/* ]

jobs:
test:
env:
ZEEBE_ADDRESS: "zeebe:26500"
runs-on: ubuntu-latest
strategy:
matrix:
zeebe-version: [ "0.23.5", "0.24.2" ]
python-version: [ 3.5, 3.6, 3.7, 3.8 ]

container: python:${{ matrix.python-version }}

services:
zeebe:
image: camunda/zeebe:${{ matrix.zeebe-version }}
ports:
- 26500/tcp

steps:
- uses: actions/checkout@v2
- name: Install dependencies
run: |
pip install pipenv
pipenv install --dev
- name: Run integration tests
run: |
pipenv run pytest tests
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Zeebe version support:

| Pyzeebe version | Tested Zeebe versions |
|:---------------:|----------------|
| 1.0.1 | 0.24.2 |
| 1.1.0 | 0.23, 0.24 |

## Getting Started
To install:
Expand All @@ -27,6 +27,8 @@ To install:

### Worker

The `ZeebeWorker` class uses threading to get and run jobs.

```python
from pyzeebe import ZeebeWorker, Task, TaskStatusController, TaskContext

Expand All @@ -48,6 +50,43 @@ worker.add_task(task) # Add task to zeebe worker
worker.work() # Now every time that a task with type example is called example_task will be called
```

Stop a worker:
```python
from threading import Event


stop_event = Event()
zeebe_worker.work(stop_event=stop_event) # Worker will begin working
stop_event.set() # Stops worker and all running jobs
```

### Client

```python
from pyzeebe import ZeebeClient

# Create a zeebe client
zeebe_client = ZeebeClient(hostname='localhost', port=26500)

# Run a workflow
workflow_instance_key = zeebe_client.run_workflow(bpmn_process_id='My zeebe workflow', variables={})

# Run a workflow and receive the result
workflow_result = zeebe_client.run_workflow_with_result(bpmn_process_id='My zeebe workflow',
timeout=10000) # Will wait 10000 milliseconds (10 seconds)

# Deploy a bpmn workflow definition
zeebe_client.deploy_workflow('workflow.bpmn')

# Cancel a running workflow
zeebe_client.cancel_workflow_instance(workflow_instance_key=12345)

# Publish message
zeebe_client.publish_message(name='message_name', correlation_key='some_id')


```

## Tests
Use the package manager [pip](https://pip.pypa.io/en/stable/) to install pyzeebe

Expand Down
20 changes: 20 additions & 0 deletions examples/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from pyzeebe import ZeebeClient

# Create a zeebe client
zeebe_client = ZeebeClient(hostname='localhost', port=26500)

# Run a workflow
workflow_instance_key = zeebe_client.run_workflow(bpmn_process_id='My zeebe workflow', variables={})

# Run a workflow and receive the result
workflow_result = zeebe_client.run_workflow_with_result(bpmn_process_id='My zeebe workflow',
timeout=10000) # Will wait 10000 milliseconds (10 seconds)

# Deploy a bpmn workflow definition
zeebe_client.deploy_workflow('workflow.bpmn')

# Cancel a running workflow
zeebe_client.cancel_workflow_instance(workflow_instance_key=12345)

# Publish message
zeebe_client.publish_message(name='message_name', correlation_key='some_id')
6 changes: 3 additions & 3 deletions examples/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from pyzeebe import Task, TaskContext, TaskStatusController, ZeebeWorker


def example_task(input: str) -> Dict:
return {'output': f'Hello world, {input}!'}
def example_task() -> Dict:
return {'output': f'Hello world, test!'}


def example_exception_handler(exc: Exception, context: TaskContext, controller: TaskStatusController) -> None:
Expand All @@ -13,7 +13,7 @@ def example_exception_handler(exc: Exception, context: TaskContext, controller:
controller.error(f'Failed to run task {context.type}. Reason: {exc}')


task = Task(task_type='example', task_handler=example_task, exception_handler=example_exception_handler)
task = Task(task_type='test', task_handler=example_task, exception_handler=example_exception_handler)

worker = ZeebeWorker() # Will use environment variable ZEEBE_ADDRESS or localhost:26500

Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from pyzeebe.client.client import ZeebeClient
from pyzeebe.common import exceptions
from pyzeebe.task.task import Task
from pyzeebe.task.task_context import TaskContext
from pyzeebe.task.task_status_controller import TaskStatusController
Expand Down
Empty file added pyzeebe/client/__init__.py
Empty file.
34 changes: 34 additions & 0 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Dict, List

import grpc

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter


class ZeebeClient(object):
def __init__(self, hostname: str = None, port: int = None, channel: grpc.Channel = None):
self.zeebe_adapter = ZeebeAdapter(hostname=hostname, port=port, channel=channel)

def run_workflow(self, bpmn_process_id: str, variables: Dict = None, version: int = -1) -> int:
return self.zeebe_adapter.create_workflow_instance(bpmn_process_id=bpmn_process_id, variables=variables or {},
version=version)

def run_workflow_with_result(self, bpmn_process_id: str, variables: Dict = None, version: int = -1,
timeout: int = 0, variables_to_fetch: List[str] = None) -> Dict:
return self.zeebe_adapter.create_workflow_instance_with_result(bpmn_process_id=bpmn_process_id,
variables=variables or {}, version=version,
timeout=timeout,
variables_to_fetch=variables_to_fetch or [])

def cancel_workflow_instance(self, workflow_instance_key: int) -> int:
self.zeebe_adapter.cancel_workflow_instance(workflow_instance_key=workflow_instance_key)
return workflow_instance_key

def deploy_workflow(self, *workflow_file_path: str):
self.zeebe_adapter.deploy_workflow(*workflow_file_path)

def publish_message(self, name: str, correlation_key: str, variables: Dict = None,
time_to_live_in_milliseconds: int = 60000) -> None:
self.zeebe_adapter.publish_message(name=name, correlation_key=correlation_key,
time_to_live_in_milliseconds=time_to_live_in_milliseconds,
variables=variables or {})
75 changes: 75 additions & 0 deletions pyzeebe/client/client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from random import randint
from uuid import uuid4

import pytest

from pyzeebe.client.client import ZeebeClient
from pyzeebe.common.exceptions import WorkflowNotFound
from pyzeebe.common.gateway_mock import GatewayMock
from pyzeebe.common.random_utils import RANDOM_RANGE

zeebe_client: ZeebeClient


@pytest.fixture(scope='module')
def grpc_add_to_server():
from pyzeebe.grpc_internals.zeebe_pb2_grpc import add_GatewayServicer_to_server
return add_GatewayServicer_to_server


@pytest.fixture(scope='module')
def grpc_servicer():
return GatewayMock()


@pytest.fixture(scope='module')
def grpc_stub_cls(grpc_channel):
from pyzeebe.grpc_internals.zeebe_pb2_grpc import GatewayStub
return GatewayStub


@pytest.fixture(autouse=True)
def run_around_tests(grpc_channel):
global zeebe_client
zeebe_client = ZeebeClient(channel=grpc_channel)
yield
zeebe_client = ZeebeClient(channel=grpc_channel)


def test_run_workflow(grpc_servicer):
bpmn_process_id = str(uuid4())
version = randint(0, 10)
grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, [])
assert isinstance(zeebe_client.run_workflow(bpmn_process_id=bpmn_process_id, variables={}, version=version), int)


def test_run_workflow_with_result(grpc_servicer):
bpmn_process_id = str(uuid4())
version = randint(0, 10)
grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, [])
assert isinstance(zeebe_client.run_workflow(bpmn_process_id=bpmn_process_id, variables={}, version=version), int)


def test_deploy_workflow(grpc_servicer):
bpmn_process_id = str(uuid4())
version = randint(0, 10)
grpc_servicer.mock_deploy_workflow(bpmn_process_id, version, [])
assert bpmn_process_id in grpc_servicer.deployed_workflows.keys()


def test_run_non_existent_workflow():
with pytest.raises(WorkflowNotFound):
zeebe_client.run_workflow(bpmn_process_id=str(uuid4()))


def test_run_non_existent_workflow_with_result():
with pytest.raises(WorkflowNotFound):
zeebe_client.run_workflow_with_result(bpmn_process_id=str(uuid4()))


def test_cancel_workflow_instance():
assert isinstance(zeebe_client.cancel_workflow_instance(workflow_instance_key=randint(0, RANDOM_RANGE)), int)


def test_publish_message():
zeebe_client.publish_message(name=str(uuid4()), correlation_key=str(uuid4()))
77 changes: 76 additions & 1 deletion pyzeebe/common/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,77 @@
class TaskNotFoundException(Exception):
class TaskNotFound(Exception):
pass


class WorkflowNotFound(Exception):
def __init__(self, bpmn_process_id: str, version: int):
super().__init__(
f'Workflow definition: {bpmn_process_id} with {version} was not found')
self.bpmn_process_id = bpmn_process_id
self.version = version


class WorkflowInstanceNotFound(Exception):
def __init__(self, workflow_instance_key: int):
super().__init__(f'Workflow instance key: {workflow_instance_key} was not found')
self.workflow_instance_key = workflow_instance_key


class WorkflowHasNoStartEvent(Exception):
def __init__(self, bpmn_process_id: str):
super().__init__(f"Workflow {bpmn_process_id} has no start event that can be called manually")
self.bpmn_process_id = bpmn_process_id


class ActivateJobsRequestInvalid(Exception):
def __init__(self, task_type: str, worker: str, timeout: int, max_jobs_to_activate: int):
msg = "Failed to activate jobs. Reasons:"
if task_type == "" or task_type is None:
msg = msg + "task_type is empty, "
if worker == "" or task_type is None:
msg = msg + "worker is empty, "
if timeout < 1:
msg = msg + "job timeout is smaller than 0ms, "
if max_jobs_to_activate < 1:
msg = msg + "max_jobs_to_activate is smaller than 0ms, "

super().__init__(msg)


class JobAlreadyDeactivated(Exception):
def __init__(self, job_key: int):
super().__init__(f"Job {job_key} was already stopped (Completed/Failed/Error)")
self.job_key = job_key


class JobNotFound(Exception):
def __init__(self, job_key: int):
super().__init__(f"Job {job_key} not found")
self.job_key = job_key


class WorkflowInvalid(Exception):
pass


class MessageAlreadyExists(Exception):
pass


class ElementNotFound(Exception):
pass


class InvalidJSON(Exception):
pass


class ZeebeBackPressure(Exception):
pass


class ZeebeGatewayUnavailable(Exception):
pass


class ZeebeInternalError(Exception):
pass
Loading

0 comments on commit 8244e5d

Please sign in to comment.