Skip to content

Commit

Permalink
Merge pull request #383 from dimastbk/grpc-zeebe-8.4
Browse files Browse the repository at this point in the history
Bump zeebe-grpc to 8.4.0, add tenant_id and variables
  • Loading branch information
dimastbk authored May 24, 2024
2 parents f13729b + dca8c94 commit 8970af8
Show file tree
Hide file tree
Showing 18 changed files with 315 additions and 82 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
zeebe-version: ["1.3.14", "8.2.10"]
zeebe-version: ["1.3.14", "8.1.22", "8.2.21", "8.3.5", "8.4.1"]

container: python:3.11

Expand Down
51 changes: 20 additions & 31 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ python = "^3.8"
oauthlib = "~=3.1.0"
requests-oauthlib = "~=1.3.0"
aiofiles = ">=0.7,<0.9"
zeebe-grpc = "^8.0.4.post1"
zeebe-grpc = "^8.4.0"
typing-extensions = "^4.5.0"

[tool.poetry.group.dev.dependencies]
pytest = "^7.4.0"
Expand Down
35 changes: 33 additions & 2 deletions pyzeebe/client/client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Dict, List, Optional, Tuple

import grpc
from typing_extensions import deprecated

from pyzeebe.grpc_internals.zeebe_adapter import ZeebeAdapter

Expand All @@ -17,14 +18,17 @@ def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int =

self.zeebe_adapter = ZeebeAdapter(grpc_channel, max_connection_retries)

async def run_process(self, bpmn_process_id: str, variables: Optional[Dict] = None, version: int = -1) -> int:
async def run_process(
self, bpmn_process_id: str, variables: Optional[Dict] = None, version: int = -1, tenant_id: Optional[str] = None
) -> int:
"""
Run process
Args:
bpmn_process_id (str): The unique process id of the process.
variables (dict): A dictionary containing all the starting variables the process needs. Must be JSONable.
version (int): The version of the process. Default: -1 (latest)
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.
Returns:
int: process_instance_key, the unique id of the running process generated by Zeebe.
Expand All @@ -40,7 +44,7 @@ async def run_process(self, bpmn_process_id: str, variables: Optional[Dict] = No
"""
return await self.zeebe_adapter.create_process_instance(
bpmn_process_id=bpmn_process_id, variables=variables or {}, version=version
bpmn_process_id=bpmn_process_id, variables=variables or {}, version=version, tenant_id=tenant_id
)

async def run_process_with_result(
Expand All @@ -50,6 +54,7 @@ async def run_process_with_result(
version: int = -1,
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict]:
"""
Run process and wait for the result.
Expand All @@ -60,6 +65,7 @@ async def run_process_with_result(
version (int): The version of the process. Default: -1 (latest)
timeout (int): How long to wait until a timeout occurs. Default: 0 (Zeebe default timeout)
variables_to_fetch (List[str]): Which variables to get from the finished process
tenant_id (str): The tenant ID of the process definition. New in Zeebe 8.3.
Returns:
tuple: (The process instance key, A dictionary of the end state of the process instance)
Expand All @@ -81,6 +87,7 @@ async def run_process_with_result(
version=version,
timeout=timeout,
variables_to_fetch=variables_to_fetch or [],
tenant_id=tenant_id,
)

async def cancel_process_instance(self, process_instance_key: int) -> int:
Expand All @@ -104,6 +111,7 @@ async def cancel_process_instance(self, process_instance_key: int) -> int:
await self.zeebe_adapter.cancel_process_instance(process_instance_key=process_instance_key)
return process_instance_key

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
async def deploy_process(self, *process_file_path: str) -> None:
"""
Deploy one or more processes
Expand All @@ -121,13 +129,34 @@ async def deploy_process(self, *process_file_path: str) -> None:
"""
await self.zeebe_adapter.deploy_process(*process_file_path)

async def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
"""
Deploy one or more processes
New in Zeebe 8.0.
Args:
resource_file_path (str): The file path to a resource definition file (bpmn/dmn/form)
tenant_id (str): The tenant ID of the resources to deploy. New in Zeebe 8.3.
Raises:
ProcessInvalidError: If one of the process file definitions is invalid
ZeebeBackPressureError: If Zeebe is currently in back pressure (too many requests)
ZeebeGatewayUnavailableError: If the Zeebe gateway is unavailable
ZeebeInternalError: If Zeebe experiences an internal error
UnkownGrpcStatusCodeError: If Zeebe returns an unexpected status code
"""
await self.zeebe_adapter.deploy_resource(*resource_file_path, tenant_id=tenant_id)

async def publish_message(
self,
name: str,
correlation_key: str,
variables: Optional[Dict] = None,
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
"""
Publish a message
Expand All @@ -139,6 +168,7 @@ async def publish_message(
time_to_live_in_milliseconds (int): How long this message should stay active. Default: 60000 ms (60 seconds)
message_id (str): A unique message id. Useful for avoiding duplication. If a message with this id is still
active, a MessageAlreadyExists will be raised.
tenant_id (str): The tenant ID of the message. New in Zeebe 8.3.
Raises:
MessageAlreadyExistError: If a message with message_id already exists
Expand All @@ -154,4 +184,5 @@ async def publish_message(
time_to_live_in_milliseconds=time_to_live_in_milliseconds,
variables=variables or {},
message_id=message_id,
tenant_id=tenant_id,
)
22 changes: 19 additions & 3 deletions pyzeebe/client/sync_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from typing import Dict, List, Optional, Tuple

import grpc
from typing_extensions import deprecated

from pyzeebe import ZeebeClient

Expand All @@ -11,8 +12,14 @@ def __init__(self, grpc_channel: grpc.aio.Channel, max_connection_retries: int =
self.loop = asyncio.get_event_loop()
self.client = ZeebeClient(grpc_channel, max_connection_retries)

def run_process(self, bpmn_process_id: str, variables: Optional[Dict] = None, version: int = -1) -> int:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version))
def run_process(
self,
bpmn_process_id: str,
variables: Optional[Dict] = None,
version: int = -1,
tenant_id: Optional[str] = None,
) -> int:
return self.loop.run_until_complete(self.client.run_process(bpmn_process_id, variables, version, tenant_id))

def run_process_with_result(
self,
Expand All @@ -21,24 +28,32 @@ def run_process_with_result(
version: int = -1,
timeout: int = 0,
variables_to_fetch: Optional[List[str]] = None,
tenant_id: Optional[str] = None,
) -> Tuple[int, Dict]:
return self.loop.run_until_complete(
self.client.run_process_with_result(bpmn_process_id, variables, version, timeout, variables_to_fetch)
self.client.run_process_with_result(
bpmn_process_id, variables, version, timeout, variables_to_fetch, tenant_id
)
)

def cancel_process_instance(self, process_instance_key: int) -> int:
return self.loop.run_until_complete(self.client.cancel_process_instance(process_instance_key))

@deprecated("Deprecated since Zeebe 8.0. Use deploy_resource instead")
def deploy_process(self, *process_file_path: str) -> None:
return self.loop.run_until_complete(self.client.deploy_process(*process_file_path))

def deploy_resource(self, *resource_file_path: str, tenant_id: Optional[str] = None) -> None:
return self.loop.run_until_complete(self.client.deploy_resource(*resource_file_path, tenant_id=tenant_id))

def publish_message(
self,
name: str,
correlation_key: str,
variables: Optional[Dict] = None,
time_to_live_in_milliseconds: int = 60000,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> None:
return self.loop.run_until_complete(
self.client.publish_message(
Expand All @@ -47,5 +62,6 @@ def publish_message(
variables,
time_to_live_in_milliseconds,
message_id,
tenant_id,
)
)
28 changes: 23 additions & 5 deletions pyzeebe/grpc_internals/zeebe_job_adapter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import json
import logging
from typing import AsyncGenerator, Dict, List
from typing import AsyncGenerator, Dict, List, Optional

import grpc
from zeebe_grpc.gateway_pb2 import (
Expand Down Expand Up @@ -34,6 +34,7 @@ async def activate_jobs(
max_jobs_to_activate: int,
variables_to_fetch: List[str],
request_timeout: int,
tenant_ids: Optional[List[str]] = None,
) -> AsyncGenerator[Job, None]:
try:
async for response in self._gateway_stub.ActivateJobs(
Expand All @@ -44,6 +45,7 @@ async def activate_jobs(
maxJobsToActivate=max_jobs_to_activate,
fetchVariable=variables_to_fetch,
requestTimeout=request_timeout,
tenantIds=tenant_ids,
)
):
for raw_job in response.jobs:
Expand All @@ -70,6 +72,7 @@ def _create_job_from_raw_job(self, response) -> Job:
retries=response.retries,
deadline=response.deadline,
variables=json.loads(response.variables),
tenant_id=response.tenantId,
zeebe_adapter=self,
)

Expand All @@ -85,10 +88,18 @@ async def complete_job(self, job_key: int, variables: Dict) -> CompleteJobRespon
raise JobAlreadyDeactivatedError(job_key=job_key) from grpc_error
await self._handle_grpc_error(grpc_error)

async def fail_job(self, job_key: int, retries: int, message: str) -> FailJobResponse:
async def fail_job(
self, job_key: int, retries: int, message: str, retry_back_off_ms: int, variables: Dict
) -> FailJobResponse:
try:
return await self._gateway_stub.FailJob(
FailJobRequest(jobKey=job_key, retries=retries, errorMessage=message)
FailJobRequest(
jobKey=job_key,
retries=retries,
errorMessage=message,
retryBackOff=retry_back_off_ms,
variables=json.dumps(variables),
)
)
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.NOT_FOUND):
Expand All @@ -97,10 +108,17 @@ async def fail_job(self, job_key: int, retries: int, message: str) -> FailJobRes
raise JobAlreadyDeactivatedError(job_key=job_key) from grpc_error
await self._handle_grpc_error(grpc_error)

async def throw_error(self, job_key: int, message: str, error_code: str = "") -> ThrowErrorResponse:
async def throw_error(
self, job_key: int, message: str, variables: Dict, error_code: str = ""
) -> ThrowErrorResponse:
try:
return await self._gateway_stub.ThrowError(
ThrowErrorRequest(jobKey=job_key, errorMessage=message, errorCode=error_code)
ThrowErrorRequest(
jobKey=job_key,
errorMessage=message,
errorCode=error_code,
variables=json.dumps(variables),
)
)
except grpc.aio.AioRpcError as grpc_error:
if is_error_status(grpc_error, grpc.StatusCode.NOT_FOUND):
Expand Down
2 changes: 2 additions & 0 deletions pyzeebe/grpc_internals/zeebe_message_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ async def publish_message(
time_to_live_in_milliseconds: int,
variables: Dict,
message_id: Optional[str] = None,
tenant_id: Optional[str] = None,
) -> PublishMessageResponse:
try:
return await self._gateway_stub.PublishMessage(
Expand All @@ -26,6 +27,7 @@ async def publish_message(
messageId=message_id,
timeToLive=time_to_live_in_milliseconds,
variables=json.dumps(variables),
tenantId=tenant_id,
)
)
except grpc.aio.AioRpcError as grpc_error:
Expand Down
Loading

0 comments on commit 8970af8

Please sign in to comment.