Skip to content

Commit

Permalink
Includes support for using p-rabbitmq from CLoudFoundry as MQTT
Browse files Browse the repository at this point in the history
Note: We still have an open ticket to make the mqtt public, currently it is a private ip.

Additions:
-  manifest now includes step to create the service and binding to the efc app
- created a Map for keys and values found if VCAP_SERVCIES mqtt exists
- Added connection options to pass to the connect function to include username and password to the connection.
  • Loading branch information
Daniel Berry committed May 25, 2017
1 parent 88199ce commit 5b1c09b
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
2 changes: 2 additions & 0 deletions manifest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pushd web && npm install && npm run build:devio && popd
# pushd server && lein uberjar && popd
# if ! cf s | grep 'efc-db-test'; then cf cs pg_95_XL_DEV_SHARED_001 large-dev-100 efc-db-test -c cf/efc-db.json; fi
# if ! cf S | grep 'efc-mqtt-test'; then cf cs p-rabbitmq standard efc-mqtt-test; fi
# cf push
#############
---
Expand Down Expand Up @@ -32,3 +33,4 @@ applications:
TRUST_STORE: /app/server/tls/test-cacerts.jks
services:
- efc-db-test
- efc-mqtt-test
28 changes: 24 additions & 4 deletions server/src/spacon/components/mqtt/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,30 @@
[clojure.core.async :as async]
[spacon.components.http.auth :refer [get-token]]
[clojure.tools.logging :as log]
[spacon.components.queue.protocol :as queue])
[spacon.components.queue.protocol :as queue]
[clojure.data.json :as json])
(:import (org.eclipse.paho.client.mqttv3 MqttException)
(java.net InetAddress)))

(def vcap_mqtt
"Map of keys and values from VCAP_SERVICES mqtt environment variable"
(or (some-> (System/getenv "VCAP_SERVICES")
(json/read-str :key-fn clojure.core/keyword) :p-rabbitmq first :credentials :protocols :mqtt)
{:username
:password
:host
:port}))

(def mqtt-connect-ops
"Map of authentication keys and values for mqtt connect options"
{:username (or (System/getenv "MQTT_USERNAME") (:username vcap_mqtt))
:password (or (System/getenv "MQTT_PASSWORD") (:password vcap_mqtt))})

(def vcap_tcp
"When vcap_mqtt exists it returns a formated tcp connection"
(when (some? vcap_mqtt)
(format "%s://%s:%s" "tcp" (:host vcap_mqtt) (:port vcap_mqtt))))

This comment has been minimized.

Copy link
@mrcnc

mrcnc May 25, 2017

Contributor

We should make the scheme configurable too. If we're going over TLS, we'll need to change it to ssl:// for mosquitto....I'd have to double check if Rabbit requires this also but I'd assume so


(def client-id (or (System/getenv "MQTT_CLIENT_ID")
(subs (str "sc-" (InetAddress/getLocalHost)) 0 22 )))
(defonce conn (atom nil))
Expand Down Expand Up @@ -61,7 +81,7 @@
(log/debugf "Connecting MQTT Client to %s" url)
(try
(do
(reset! conn (mh/connect url client-id))
(reset! conn (mh/connect url client-id mqtt-connect-ops))
(log/infof "MQTT Client connected to %s" url))
(catch MqttException e
(do
Expand All @@ -82,7 +102,7 @@
(async/go (async/>!! (:subscribe-channel mqtt) {:topic topic :message (msg/from-bytes message)}))))

(defn reconnect [mqtt-comp reason ]
(let [url (or (System/getenv "MQTT_BROKER_URL") "tcp://localhost:1883")]
(let [url (or (System/getenv "MQTT_BROKER_URL") vcap_tcp "tcp://localhost:1883")]
(log/debugf "Connection lost (%s). Attempting reconnect to %s" reason url)
(connectmqtt url)
(doall (map (fn [t]
Expand Down Expand Up @@ -148,7 +168,7 @@
queue/IQueue
(start [this]
(log/debug "Starting MQTT Component")
(let [url (or (:broker-url mqtt-config) "tcp://localhost:1883")
(let [url (or (:broker-url mqtt-config) vcap_tcp "tcp://localhost:1883")
m (connectmqtt url)
pub-chan (async/chan)
sub-chan (async/chan)
Expand Down

0 comments on commit 5b1c09b

Please sign in to comment.