Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

grpc client generation python example added #323

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions grpc-client-generation-python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
## Client Generation with Protocol Buffers

gRPC is a high-performance, open-source universal RPC (Remote Procedure Call) framework that uses protocol buffers as its interface definition language. One of its powerful features is the automatic generation of client and server code from .proto files, which define the service methods and message types. This process simplifies the development of gRPC services and clients, making it easier to build distributed applications and microservices. Here's how gRPC allows client (and server) code generation based on .proto files:

## 1. Retrieve Service and Messages in .proto File

For 8.4 the .proto file can be retrieved here:

```
curl -O https://raw.githubusercontent.com/camunda/zeebe/stable/8.4/gateway-protocol/src/main/proto/gateway.proto
```

## 2. Install Required Tools
To generate Python code, you need the Protocol Buffer compiler (protoc) and the Python gRPC plugin. If you haven't installed these, you can do so as follows:

Install protoc from the [official releases](https://grpc.io/docs/protoc-installation/) page or via a package manager for your system.

```
apt install -y protobuf-compiler
```

Install the Python gRPC tools using pip:

```
pip install grpcio-tools
```

## 3. Generate Python gRPC Code

```
python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. gateway.proto
```

After running this command, you should see two new files in your directory:

- `gateway_pb2.py`: Contains the generated request and response classes.
- `gateway_pb2_grpc.py`: Contains the generated client and server classes.


### 4. Implement OAuth Interceptor

The interceptor is required to seamlessly inject authentication tokens into all outgoing gRPC requests, ensuring secure communication with the Zeebe broker without manually adding tokens to each call. It works by intercepting each call, obtaining a fresh OAuth token if necessary, and appending it to the request's metadata as an Authorization header.

Example Implementation [here](oauthinterceptor.py).

### 5. Write Zeebe Client
Example Implementation [here](zeebe_client.py).



42 changes: 42 additions & 0 deletions grpc-client-generation-python/example.bpmn
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:zeebe="http://camunda.org/schema/zeebe/1.0" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0uvm6ju" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.16.0" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.3.0">
<bpmn:process id="example" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_1ch9458</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_1ch9458" sourceRef="StartEvent_1" targetRef="Activity_1bv9kj9" />
<bpmn:endEvent id="Event_155fi0m">
<bpmn:incoming>Flow_0c6v5vz</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0c6v5vz" sourceRef="Activity_1bv9kj9" targetRef="Event_155fi0m" />
<bpmn:serviceTask id="Activity_1bv9kj9" name="example">
<bpmn:extensionElements>
<zeebe:taskDefinition type="dummy" />
</bpmn:extensionElements>
<bpmn:incoming>Flow_1ch9458</bpmn:incoming>
<bpmn:outgoing>Flow_0c6v5vz</bpmn:outgoing>
</bpmn:serviceTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="ExceptionHandling">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="179" y="99" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_155fi0m_di" bpmnElement="Event_155fi0m">
<dc:Bounds x="432" y="99" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0jj7g48_di" bpmnElement="Activity_1bv9kj9">
<dc:Bounds x="270" y="77" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_1ch9458_di" bpmnElement="Flow_1ch9458">
<di:waypoint x="215" y="117" />
<di:waypoint x="270" y="117" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0c6v5vz_di" bpmnElement="Flow_0c6v5vz">
<di:waypoint x="370" y="117" />
<di:waypoint x="432" y="117" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>
67 changes: 67 additions & 0 deletions grpc-client-generation-python/oauthinterceptor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from collections import namedtuple
from grpc import UnaryUnaryClientInterceptor, StreamStreamClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor
import requests
import time

# Maintain the namedtuple definition for ClientCallDetails
ClientCallDetails = namedtuple('ClientCallDetails', ('method', 'timeout', 'metadata', 'credentials', 'wait_for_ready', 'compression'))

class OAuthInterceptor(UnaryUnaryClientInterceptor, UnaryStreamClientInterceptor, StreamUnaryClientInterceptor, StreamStreamClientInterceptor):
def __init__(self, token_url, client_id, client_secret, audience):
self.token_url = token_url
self.client_id = client_id
self.client_secret = client_secret
self.audience = audience
self.token = None
self.token_expiry = None

def get_access_token(self):
"""Fetch the access token using client credentials."""
if self.token and self.token_expiry > time.time():
return self.token

print("Retrieving new token...")
payload = {
'grant_type': 'client_credentials',
'client_id': self.client_id,
'client_secret': self.client_secret,
'audience': self.audience
}
response = requests.post(self.token_url, data=payload)
response_data = response.json()
self.token = response_data['access_token']
self.token_expiry = time.time() + response_data['expires_in'] - 60 # 60 seconds leeway

return self.token

def update_metadata(self, client_call_details, token):
metadata = [('authorization', f'Bearer {token}')]
if client_call_details.metadata is not None:
metadata.extend(client_call_details.metadata)
# Return a new ClientCallDetails instance with updated metadata
return ClientCallDetails(
client_call_details.method,
client_call_details.timeout,
metadata,
client_call_details.credentials,
client_call_details.wait_for_ready,
client_call_details.compression
)

def intercept_call(self, continuation, client_call_details, request_or_iterator):
token = self.get_access_token()
new_call_details = self.update_metadata(client_call_details, token)
return continuation(new_call_details, request_or_iterator)

# Implement the intercept method for each call type using intercept_call
def intercept_unary_unary(self, continuation, client_call_details, request):
return self.intercept_call(continuation, client_call_details, request)

def intercept_unary_stream(self, continuation, client_call_details, request):
return self.intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self.intercept_call(continuation, client_call_details, request_iterator)

def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self.intercept_call(continuation, client_call_details, request_iterator)
65 changes: 65 additions & 0 deletions grpc-client-generation-python/zeebe_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import grpc
import gateway_pb2
import gateway_pb2_grpc
from oauthinterceptor import OAuthInterceptor

def run():

# OAuth Interceptor Configuration
token_url = "http://localhost:18080/auth/realms/camunda-platform/protocol/openid-connect/token"
client_id = "zeebe"
client_secret = "NFp6GKwftJ"
audience = "zeebe-api"

# Create an instance of the OAuthInterceptor
oauth_interceptor = OAuthInterceptor(token_url, client_id, client_secret, audience)

# Add interceptor to the channel
intercept_channel = grpc.intercept_channel(
grpc.insecure_channel('localhost:26500'), oauth_interceptor)

# Now use the intercepted channel to create stubs
stub = gateway_pb2_grpc.GatewayStub(intercept_channel)

topologyResponse = stub.Topology(gateway_pb2.TopologyRequest())
print(topologyResponse)

tenantIds = ['custom'] # tenantIds
fileName = "example.bpmn"
with open(fileName, 'rb') as file:
bpmn_content = file.read()
print(bpmn_content)
# Deploy Diagram
for tenantId in tenantIds:
resource = gateway_pb2.Resource(name=fileName, content=bpmn_content)
deployResult = stub.DeployResource(gateway_pb2.DeployResourceRequest(tenantId=tenantId , resources=[resource]))
print(deployResult)

# Start Instances
for tenantId in tenantIds:
for x in range(20):
stub.CreateProcessInstance(gateway_pb2.CreateProcessInstanceRequest(tenantId = tenantId, bpmnProcessId="example", version=-1 ))

# Job worker logic (Activate and complete jobs)
job_type = 'dummy'
worker = 'python-worker'
timeout = 10000 # in milliseconds
maxJobsToActivate=10
request = gateway_pb2.ActivateJobsRequest(type=job_type, worker=worker, timeout=timeout, maxJobsToActivate=maxJobsToActivate, tenantIds=tenantIds)

while True:
for activate_response in stub.ActivateJobs(request):
for job in activate_response.jobs:
print(f"Activated job {job.key}")
# Process the job here

# Complete the job
complete_request = gateway_pb2.CompleteJobRequest(jobKey=job.key, variables='{}')
stub.CompleteJob(complete_request)
print(f"Completed job {job.key}")
print("Looking for jobs again...")



if __name__ == '__main__':
run()
Loading