From 5b1c09b0a9f0d4618e1b0e76b22f370840b85d53 Mon Sep 17 00:00:00 2001 From: Daniel Berry Date: Thu, 25 May 2017 09:10:45 -0500 Subject: [PATCH] Includes support for using p-rabbitmq from CLoudFoundry as MQTT Note: We still have an open ticket to make the mqtt public, currently it is a private ip. Additions: - manifest now includes step to create the service and binding to the efc app - created a Map for keys and values found if VCAP_SERVCIES mqtt exists - Added connection options to pass to the connect function to include username and password to the connection. --- manifest.yml | 2 ++ server/src/spacon/components/mqtt/core.clj | 28 ++++++++++++++++++---- 2 files changed, 26 insertions(+), 4 deletions(-) diff --git a/manifest.yml b/manifest.yml index b8816b0..6efd5a1 100644 --- a/manifest.yml +++ b/manifest.yml @@ -5,6 +5,7 @@ # pushd web && npm install && npm run build:devio && popd # pushd server && lein uberjar && popd # if ! cf s | grep 'efc-db-test'; then cf cs pg_95_XL_DEV_SHARED_001 large-dev-100 efc-db-test -c cf/efc-db.json; fi +# if ! cf S | grep 'efc-mqtt-test'; then cf cs p-rabbitmq standard efc-mqtt-test; fi # cf push ############# --- @@ -32,3 +33,4 @@ applications: TRUST_STORE: /app/server/tls/test-cacerts.jks services: - efc-db-test + - efc-mqtt-test diff --git a/server/src/spacon/components/mqtt/core.clj b/server/src/spacon/components/mqtt/core.clj index 6acec67..9cd886d 100644 --- a/server/src/spacon/components/mqtt/core.clj +++ b/server/src/spacon/components/mqtt/core.clj @@ -21,10 +21,30 @@ [clojure.core.async :as async] [spacon.components.http.auth :refer [get-token]] [clojure.tools.logging :as log] - [spacon.components.queue.protocol :as queue]) + [spacon.components.queue.protocol :as queue] + [clojure.data.json :as json]) (:import (org.eclipse.paho.client.mqttv3 MqttException) (java.net InetAddress))) +(def vcap_mqtt + "Map of keys and values from VCAP_SERVICES mqtt environment variable" + (or (some-> (System/getenv "VCAP_SERVICES") + (json/read-str :key-fn clojure.core/keyword) :p-rabbitmq first :credentials :protocols :mqtt) + {:username + :password + :host + :port})) + +(def mqtt-connect-ops + "Map of authentication keys and values for mqtt connect options" + {:username (or (System/getenv "MQTT_USERNAME") (:username vcap_mqtt)) + :password (or (System/getenv "MQTT_PASSWORD") (:password vcap_mqtt))}) + +(def vcap_tcp + "When vcap_mqtt exists it returns a formated tcp connection" + (when (some? vcap_mqtt) + (format "%s://%s:%s" "tcp" (:host vcap_mqtt) (:port vcap_mqtt)))) + (def client-id (or (System/getenv "MQTT_CLIENT_ID") (subs (str "sc-" (InetAddress/getLocalHost)) 0 22 ))) (defonce conn (atom nil)) @@ -61,7 +81,7 @@ (log/debugf "Connecting MQTT Client to %s" url) (try (do - (reset! conn (mh/connect url client-id)) + (reset! conn (mh/connect url client-id mqtt-connect-ops)) (log/infof "MQTT Client connected to %s" url)) (catch MqttException e (do @@ -82,7 +102,7 @@ (async/go (async/>!! (:subscribe-channel mqtt) {:topic topic :message (msg/from-bytes message)})))) (defn reconnect [mqtt-comp reason ] - (let [url (or (System/getenv "MQTT_BROKER_URL") "tcp://localhost:1883")] + (let [url (or (System/getenv "MQTT_BROKER_URL") vcap_tcp "tcp://localhost:1883")] (log/debugf "Connection lost (%s). Attempting reconnect to %s" reason url) (connectmqtt url) (doall (map (fn [t] @@ -148,7 +168,7 @@ queue/IQueue (start [this] (log/debug "Starting MQTT Component") - (let [url (or (:broker-url mqtt-config) "tcp://localhost:1883") + (let [url (or (:broker-url mqtt-config) vcap_tcp "tcp://localhost:1883") m (connectmqtt url) pub-chan (async/chan) sub-chan (async/chan)