This library has been instrumented with Streamdal's Python SDK.
The following environment variables must be set before launching a producer or consumer:
STREAMDAL_ADDRESS
- Address for the streamdal server (Example:
localhost:8082
)
- Address for the streamdal server (Example:
STREAMDAL_TOKEN
- Authentication token used by the server (Example:
1234
)
- Authentication token used by the server (Example:
STREAMDAL_SERVICE_NAME
- How this application/service will be identified in Streamdal Console (Example:
billing-svc
)
- How this application/service will be identified in Streamdal Console (Example:
By default, the library will not have Streamdal instrumentation enabled; to enable it,
you will need to set enable_streamdal=True
in the pika connection params:
pika.ConnectionParameters(enable_streamdal=True)
For more in-depth explanation of the changes and available settings, see What's changed?.
A fully working example is provided in examples/streamdal.
To run the example:
- Change directory to
examples/streamdal
- Start a local rabbitmq instance:
docker-compose up -d rabbitmq
- Install & start Streamdal:
curl -sSL https://sh.streamdal.com | sh
- Open a browser to verify you can see the streamdal UI at:
http://localhost:8080
- Launch a consumer:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=demo \ python consumer.py
- In another terminal, launch a producer:
STREAMDAL_ADDRESS=localhost:8082 \ STREAMDAL_TOKEN=1234 \ STREAMDAL_SERVICE_NAME=demo \ python producer.py
- Open the Streamdal Console in a browser https://localhost:8080
- Create a pipeline that detects and masks PII fields & attach it to the consumer
- Produce a message in producer terminal:
testKey:{"email":"[email protected]"}
- You should see a masked message in the consumer terminal:
{"email":"fo*********"}
- Tip: If you detach the pipeline from the consumer and paste the same message again, you will see the original, unmasked message.
By default, the shim will set the component_name
to rabbitmq
and the operation_name
to the name of the routing key you are producing to, or the string f"{queue}_{binding_key}"
being consumed from
Also, by default, if the shim runs into any errors executing .process()
,
it will swallow the errors and return the original value.
When producing, you can set streamdal_cfg
in the basic_publish()
:
cfg_produce = StreamdalRuntimeConfig(
audience=Audience(
component_name="rabbitmq",
operation_name="produce_msg",
operation_type=OPERATION_TYPE_PRODUCER
)
)
channel.basic_publish(
exchange='',
routing_key='test',
body='Hello World!',
streamdal_cfg=cfg_produce
)
When consuming, you can set streamdal_cfg
in the consume()
call:
cfg_consume = StreamdalRuntimeConfig(
audience=Audience(
component_name="rabbitmq",
operation_name="consume_msg",
operation_type=OPERATION_TYPE_CONSUMER
)
)
for method_frame, properties, body in channel.consume(queue='test', streamdal_cfg=cfg_consume):
#...
The goal of any shim is to make minimally invasive changes so that the original library remains backwards-compatible and does not present any "surprises" at runtime.
consume()
now accepts an optionalstreamdal_cfg
parameterbasic_publish()
now accepts an optionalstreamdal_cfg
parameterConnectionParameters
now accepts an optionalenable_streamdal
boolean parameter