EMQ X can bridge and forward messages to Kafka, RabbitMQ or other EMQ X nodes. Meanwhile, mosquitto and rsm can be bridged to EMQ X using common MQTT connection.
Bridge Plugin | Config File | Description |
---|---|---|
emqx_bridge_kafka | emqx_bridge_kafka.conf | Kafka Bridge |
emqx_bridge_rabbit | emqx_bridge_rabbit.conf | RabbitMQ Bridge |
emqx_bridge_pulsar | emqx_bridge_pulsar.conf | Pulsar Bridge |
emqx_bridge_mqtt | emqx_bridge_mqtt.conf | MQTT Broker Bridge |
EMQ X bridges and forwards MQTT messages to Kafka cluster:
Config file for Kafka bridge plugin: etc/plugins/emqx_bridge_kafka.conf
## Kafka Server
## bridge.kafka.servers = 127.0.0.1:9092,127.0.0.2:9092,127.0.0.3:9092
bridge.kafka.servers = 127.0.0.1:9092
## Kafka Parition Strategy. option value: per_partition | per_broker
bridge.kafka.connection_strategy = per_partition
bridge.kafka.min_metadata_refresh_interval = 5S
## Produce writes type. option value: sync | async
bridge.kafka.produce = sync
bridge.kafka.produce.sync_timeout = 3S
## Base directory for replayq to store messages on disk.
## If this config entry if missing or set to undefined,
## replayq works in a mem-only manner.
## i.e. messages are not queued on disk -- in such case,
## the send or send_sync API callers are responsible for
## possible message loss in case of application,
## network or kafka disturbances. For instance,
## in the wolff:send API caller may trap_exit then
## react on parition-producer worker pid's 'EXIT'
## message to issue a retry after restarting the producer.
## bridge.kafka.replayq_dir = /tmp/emqx_bridge_kafka/
## default=10MB, replayq segment size.
## bridge.kafka.producer.replayq_seg_bytes = 10MB
## producer required_acks. option value all_isr | leader_only | none.
bridge.kafka.producer.required_acks = none
## default=10000. Timeout leader wait for replicas before reply to producer.
## bridge.kafka.producer.ack_timeout = 10S
## default number of message sets sent on wire before block waiting for acks
## bridge.kafka.producer.max_batch_bytes = 1024KB
## by default, send max 1 MB of data in one batch (message set)
## bridge.kafka.producer.min_batch_bytes = 0
## Number of batches to be sent ahead without receiving ack for the last request.
## Must be 0 if messages must be delivered in strict order.
## bridge.kafka.producer.max_send_ahead = 0
## by default, no compression
# bridge.kafka.producer.compression = no_compression
# bridge.kafka.encode_payload_type = base64
# bridge.kafka.sock.buffer = 32KB
# bridge.kafka.sock.recbuf = 32KB
bridge.kafka.sock.sndbuf = 1MB
# bridge.kafka.sock.read_packets = 20
## Bridge Kafka Hooks
## ${topic}: the kafka topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
bridge.kafka.hook.client.connected.1 = {"topic": "client_connected"}
bridge.kafka.hook.client.disconnected.1 = {"topic": "client_disconnected"}
bridge.kafka.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
bridge.kafka.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
bridge.kafka.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
bridge.kafka.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
bridge.kafka.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
Event | Description |
---|---|
bridge.kafka.hook.client.connected.1 | Client connected |
bridge.kafka.hook.client.disconnected.1 | Client disconnected |
bridge.kafka.hook.session.subscribed.1 | Topics subscribed |
bridge.kafka.hook.session.unsubscribed.1 | Topics unsubscribed |
bridge.kafka.hook.message.publish.1 | Messages published |
bridge.kafka.hook.message.delivered.1 | Messages delivered |
bridge.kafka.hook.message.acked.1 | Messages acknowledged |
Client goes online, EMQ X forwards 'client_connected' event message to Kafka:
topic = "client_connected",
value = {
"client_id": ${clientid},
"node": ${node},
"ts": ${ts}
}
Client goes offline, EMQ X forwards 'client_disconnected' event message to Kafka:
topic = "client_disconnected",
value = {
"client_id": ${clientid},
"reason": ${reason},
"node": ${node},
"ts": ${ts}
}
topic = session_subscribed
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = session_unsubscribed
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_publish
value = {
"client_id": ${clientid},
"username": ${username},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_delivered
value = {"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_acked
value = {
"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
Kafka consumes MQTT clients connected / disconnected event messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_connected --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic client_disconnected --from-beginning
Kafka consumes MQTT subscription messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_subscribed --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic session_unsubscribed --from-beginning
Kafka consumes MQTT published messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_publish --from-beginning
Kafka consumes MQTT message Deliver and Ack event messages:
sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_delivered --from-beginning sh kafka-console-consumer.sh --zookeeper localhost:2181 --topic message_acked --from-beginning
Note
the payload is base64 encoded
./bin/emqx_ctl plugins load emqx_bridge_kafka
EMQ X bridges and forwards MQTT messages to RabbitMQ cluster:
Config file of RabbitMQ bridge plugin: etc/plugins/emqx_bridge_rabbit.conf
## Rabbit Brokers Server
bridge.rabbit.1.server = 127.0.0.1:5672
## Rabbit Brokers pool_size
bridge.rabbit.1.pool_size = 4
## Rabbit Brokers username
bridge.rabbit.1.username = guest
## Rabbit Brokers password
bridge.rabbit.1.password = guest
## Rabbit Brokers virtual_host
bridge.rabbit.1.virtual_host = /
## Rabbit Brokers heartbeat
bridge.rabbit.1.heartbeat = 30
# bridge.rabbit.2.server = 127.0.0.1:5672
# bridge.rabbit.2.pool_size = 8
# bridge.rabbit.2.username = guest
# bridge.rabbit.2.password = guest
# bridge.rabbit.2.virtual_host = /
# bridge.rabbit.2.heartbeat = 30
## Bridge Hooks
bridge.rabbit.hook.client.subscribe.1 = {"action": "on_client_subscribe", "rabbit": 1, "exchange": "direct:emq.subscription"}
bridge.rabbit.hook.client.unsubscribe.1 = {"action": "on_client_unsubscribe", "rabbit": 1, "exchange": "direct:emq.unsubscription"}
bridge.rabbit.hook.message.publish.1 = {"topic": "$SYS/#", "action": "on_message_publish", "rabbit": 1, "exchange": "topic:emq.$sys"}
bridge.rabbit.hook.message.publish.2 = {"topic": "#", "action": "on_message_publish", "rabbit": 1, "exchange": "topic:emq.pub"}
bridge.rabbit.hook.message.acked.1 = {"topic": "#", "action": "on_message_acked", "rabbit": 1, "exchange": "topic:emq.acked"}
routing_key = subscribe
exchange = emq.subscription
headers = [{<<"x-emq-client-id">>, binary, ClientId}]
payload = jsx:encode([{Topic, proplists:get_value(qos, Opts)} || {Topic, Opts} <- TopicTable])
routing_key = unsubscribe
exchange = emq.unsubscription
headers = [{<<"x-emq-client-id">>, binary, ClientId}]
payload = jsx:encode([Topic || {Topic, _Opts} <- TopicTable]),
routing_key = binary:replace(binary:replace(Topic, <<"/">>, <<".">>, [global]),<<"+">>, <<"*">>, [global])
exchange = emq.$sys | emq.pub
headers = [{<<"x-emq-publish-qos">>, byte, Qos},
{<<"x-emq-client-id">>, binary, pub_from(From)},
{<<"x-emq-publish-msgid">>, binary, emqx_base62:encode(Id)}]
payload = Payload
routing_key = puback
exchange = emq.acked
headers = [{<<"x-emq-msg-acked">>, binary, ClientId}],
payload = emqx_base62:encode(Id)
Sample code of Rabbit message Consumption in Python:
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct:emq.subscription', exchange_type='direct')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='direct:emq.subscription', queue=queue_name, routing_key= 'subscribe')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()
Sample of RabbitMQ client coding in other programming languages:
https://github.com/rabbitmq/rabbitmq-tutorials
./bin/emqx_ctl plugins load emqx_bridge_rabbit
EMQ X bridges and forwards MQTT messages to Pulsar cluster:
Config file for Pulsar bridge plugin: etc/plugins/emqx_bridge_pulsar.conf
## Pulsar Server
bridge.pulsar.servers = 127.0.0.1:6650
## Pick a partition producer and sync/async
bridge.pulsar.produce = sync
## bridge.pulsar.produce.sync_timeout = 3s
## bridge.pulsar.producer.batch_size = 1000
## by default, no compression
## bridge.pulsar.producer.compression = no_compression
## bridge.pulsar.encode_payload_type = base64
## bridge.pulsar.sock.buffer = 32KB
## bridge.pulsar.sock.recbuf = 32KB
bridge.pulsar.sock.sndbuf = 1MB
## bridge.pulsar.sock.read_packets = 20
## Bridge Pulsar Hooks
## ${topic}: the pulsar topics to which the messages will be published.
## ${filter}: the mqtt topic (may contain wildcard) on which the action will be performed .
## Client Connected Record Hook
bridge.pulsar.hook.client.connected.1 = {"topic": "client_connected"}
## Client Disconnected Record Hook
bridge.pulsar.hook.client.disconnected.1 = {"topic": "client_disconnected"}
## Session Subscribed Record Hook
bridge.pulsar.hook.session.subscribed.1 = {"filter": "#", "topic": "session_subscribed"}
## Session Unsubscribed Record Hook
bridge.pulsar.hook.session.unsubscribed.1 = {"filter": "#", "topic": "session_unsubscribed"}
## Message Publish Record Hook
bridge.pulsar.hook.message.publish.1 = {"filter": "#", "topic": "message_publish"}
## Message Delivered Record Hook
bridge.pulsar.hook.message.delivered.1 = {"filter": "#", "topic": "message_delivered"}
## Message Acked Record Hook
bridge.pulsar.hook.message.acked.1 = {"filter": "#", "topic": "message_acked"}
## More Configures
## partitioner strategy:
## Option: random | roundrobin | first_key_dispatch
## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "strategy":"random"}
## key:
## Option: ${clientid} | ${username}
## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "key":"${clientid}"}
## format:
## Option: json | json
## Example: bridge.pulsar.hook.message.publish.1 = {"filter":"#", "topic":"message_publish", "format":"json"}
Event | Description |
---|---|
bridge.pulsar.hook.client.connected.1 | Client connected |
bridge.pulsar.hook.client.disconnected.1 | Client disconnected |
bridge.pulsar.hook.session.subscribed.1 | Topics subscribed |
bridge.pulsar.hook.session.unsubscribed.1 | Topics unsubscribed |
bridge.pulsar.hook.message.publish.1 | Messages published |
bridge.pulsar.hook.message.delivered.1 | Messages delivered |
bridge.pulsar.hook.message.acked.1 | Messages acknowledged |
Client goes online, EMQ X forwards 'client_connected' event message to Pulsar:
topic = "client_connected",
value = {
"client_id": ${clientid},
"username": ${username},
"node": ${node},
"ts": ${ts}
}
Client goes offline, EMQ X forwards 'client_disconnected' event message to Pulsar:
topic = "client_disconnected",
value = {
"client_id": ${clientid},
"username": ${username},
"reason": ${reason},
"node": ${node},
"ts": ${ts}
}
topic = session_subscribed
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = session_unsubscribed
value = {
"client_id": ${clientid},
"topic": ${topic},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_publish
value = {
"client_id": ${clientid},
"username": ${username},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_delivered
value = {"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
topic = message_acked
value = {
"client_id": ${clientid},
"username": ${username},
"from": ${fromClientId},
"topic": ${topic},
"payload": ${payload},
"qos": ${qos},
"node": ${node},
"ts": ${timestamp}
}
Pulsar consumes MQTT clients connected / disconnected event messages:
sh pulsar-client consume client_connected -s "client_connected" -n 1000 sh pulsar-client consume client_disconnected -s "client_disconnected" -n 1000
Pulsar consumes MQTT subscription messages:
sh pulsar-client consume session_subscribed -s "session_subscribed" -n 1000 sh pulsar-client consume session_unsubscribed -s "session_unsubscribed" -n 1000
Pulsar consumes MQTT published messages:
sh pulsar-client consume message_publish -s "message_publish" -n 1000
Pulsar consumes MQTT message Deliver and Ack event messages:
sh pulsar-client consume message_delivered -s "message_delivered" -n 1000 sh pulsar-client consume message_acked -s "message_acked" -n 1000
Note
the payload is base64 encoded default
./bin/emqx_ctl plugins load emqx_bridge_pulsar
EMQ X bridges and forwards MQTT messages to MQTT Broker:
Config file for MQTT bridge plugin: etc/plugins/emqx_bridge_mqtt.conf
## Bridge address: node name for local bridge, host:port for remote
bridge.mqtt.aws.address = 127.0.0.1:1883
## Protocol version of the bridge: mqttv3 | mqttv4 | mqttv5
bridge.mqtt.aws.proto_ver = mqttv4
## Whether to enable bridge mode for mqtt bridge
bridge.mqtt.aws.bridge_mode = true
## The ClientId of a remote bridge
bridge.mqtt.aws.client_id = bridge_aws
## The Clean start flag of a remote bridge
## NOTE: Some IoT platforms require clean_start must be set to 'true'
bridge.mqtt.aws.clean_start = true
## The username for a remote bridge
bridge.mqtt.aws.username = user
## The password for a remote bridge
bridge.mqtt.aws.password = passwd
## Bribge to remote server via SSL
bridge.mqtt.aws.ssl = off
## PEM-encoded CA certificates of the bridge
bridge.mqtt.aws.cacertfile = etc/certs/cacert.pem
## Client SSL Certfile of the bridge
bridge.mqtt.aws.certfile = etc/certs/client-cert.pem
## Client SSL Keyfile of the bridge
bridge.mqtt.aws.keyfile = etc/certs/client-key.pem
## SSL Ciphers used by the bridge
bridge.mqtt.aws.ciphers = ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384
## Ciphers for TLS PSK
## Note that 'bridge.${BridgeName}.ciphers' and 'bridge.${BridgeName}.psk_ciphers' cannot be configured at the same time.
##
## See 'https://tools.ietf.org/html/rfc4279#section-2'.
bridge.mqtt.aws.psk_ciphers = PSK-AES128-CBC-SHA,PSK-AES256-CBC-SHA,PSK-3DES-EDE-CBC-SHA,PSK-RC4-SHA
## Ping interval of a down bridge.
bridge.mqtt.aws.keepalive = 60s
## TLS versions used by the bridge.
bridge.mqtt.aws.tls_versions = tlsv1.2,tlsv1.1,tlsv1
## Mountpoint of the bridge
bridge.mqtt.aws.mountpoint = bridge/aws/${node}/
## Forward message topics
bridge.mqtt.aws.forwards = topic1/#,topic2/#
## Subscriptions of the bridge topic
bridge.mqtt.aws.subscription.1.topic = cmd/topic1
## Subscriptions of the bridge qos
bridge.mqtt.aws.subscription.1.qos = 1
## Subscriptions of the bridge topic
bridge.mqtt.aws.subscription.2.topic = cmd/topic2
## Subscriptions of the bridge qos
bridge.mqtt.aws.subscription.2.qos = 1
Mountpoint:
Mountpoint is used to prefix of topic when forwarding a message, this option must be used with forwards
. Forwards the message whose topic is "sensor1/hello", its topic will change to "bridge/aws/[email protected]/sensor1/hello" when it reaches the remote node.
Forwards:
Messages forwarded to forwards
specified by local EMQ X are forwarded to the remote MQTT Broker.
Subscription: Local EMQ X synchronizes messages from a remote MQTT Broker to local by subscribing to the topic of the remote MQTT Broker.
./bin/emqx_ctl plugins load emqx_bridge_mqtt
$ cd emqx && ./bin/emqx_ctl bridges
bridges list # List bridges
bridges start <Name> # Start a bridge
bridges stop <Name> # Stop a bridge
bridges forwards <Name> # Show a bridge forward topic
bridges add-forward <Name> <Topic> # Add bridge forward topic
bridges del-forward <Name> <Topic> # Delete bridge forward topic
bridges subscriptions <Name> # Show a bridge subscriptions topic
bridges add-subscription <Name> <Topic> <Qos> # Add bridge subscriptions topic
$ ./bin/emqx_ctl bridges list
name: emqx status: Stopped
$ ./bin/emqx_ctl bridges start emqx
Start bridge successfully.
$ ./bin/emqx_ctl bridges stop emqx
Stop bridge successfully.
$ ./bin/emqx_ctl bridges forwards emqx
topic: topic1/#
topic: topic2/#
$ ./bin/emqx_ctl bridges add-forwards emqx topic3/#
Add-forward topic successfully.
$ ./bin/emqx_ctl bridges del-forwards emqx topic3/#
Del-forward topic successfully.
$ ./bin/emqx_ctl bridges subscriptions emqx
topic: cmd/topic1, qos: 1
topic: cmd/topic2, qos: 1
$ ./bin/emqx_ctl bridges add-subscription emqx cmd/topic3 1
Add-subscription topic successfully.
$ ./bin/emqx_ctl bridges del-subscription emqx cmd/topic3
Del-subscription topic successfully.
EMQ X bridges and forwards MQTT messages to remote EMQ X:
Config file for RPC bridge plugin: etc/plugins/emqx_bridge_mqtt.conf
bridge.mqtt.emqx.address = [email protected]
## Mountpoint of the bridge
bridge.mqtt.emqx.mountpoint = bridge/emqx1/${node}/
## Forward message topics
bridge.mqtt.emqx.forwards = topic1/#,topic2/#
Mountpoint:
Mountpoint is used to prefix of topic when forwarding a message, this option must be used with forwards
. Forwards the message whose topic is "sensor1/hello", its topic will change to "bridge/aws/[email protected]/sensor1/hello" when it reaches the remote node.
Forwards:
Messages forwarded to forwards
specified by local EMQ X are forwarded to the remote EMQ X.
CLI of RPC bridge is used in the same way as the MQTT bridge.