Skip to content

Commit

Permalink
refactor: subscribe MQTT channels with downlink_enabled
Browse files Browse the repository at this point in the history
  • Loading branch information
andrekir committed Oct 16, 2023
1 parent 5ece09b commit 47bc921
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 7 deletions.
6 changes: 6 additions & 0 deletions app/src/main/java/com/geeksville/mesh/model/ChannelSet.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ fun Uri.toChannelSet(): ChannelSet {
return ChannelSet.parseFrom(bytes)
}

/**
* @return A list of globally unique channel IDs usable with MQTT subscribe()
*/
val ChannelSet.subscribeList: List<String>
get() = settingsList.filter { it.downlinkEnabled }.map { Channel(it, loraConfig).name }

/**
* Return the primary channel info
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.geeksville.mesh.repository.network

import com.geeksville.mesh.MeshProtos.MqttClientProxyMessage
import com.geeksville.mesh.ModuleConfigProtos.ModuleConfig.MQTTConfig
import com.geeksville.mesh.android.Logging
import com.geeksville.mesh.model.subscribeList
import com.geeksville.mesh.mqttClientProxyMessage
import com.geeksville.mesh.repository.datastore.ChannelSetRepository
import com.geeksville.mesh.repository.datastore.ModuleConfigRepository
import com.geeksville.mesh.util.ignoreException
import com.google.protobuf.ByteString
Expand All @@ -27,6 +28,7 @@ import javax.net.ssl.TrustManager

@Singleton
class MQTTRepository @Inject constructor(
private val channelSetRepository: ChannelSetRepository,
private val moduleConfigRepository: ModuleConfigRepository,
) : Logging {

Expand All @@ -39,19 +41,23 @@ class MQTTRepository @Inject constructor(
*/
private const val DEFAULT_QOS = 1
private const val DEFAULT_TOPIC_ROOT = "msh"
private const val VERSION_TOPIC_LEVEL = "/2/c/#"
private const val STAT_TOPIC_LEVEL = "/2/stat/"
private const val DEFAULT_TOPIC_LEVEL = "/2/c/"
private const val JSON_TOPIC_LEVEL = "/2/json/"
private const val DEFAULT_SERVER_ADDRESS = "mqtt.meshtastic.org"
}

private var mqttClient: MqttAsyncClient? = null

suspend fun connect(callback: MqttCallbackExtended) {
val mqttConfig: MQTTConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt
val channelSet = channelSetRepository.fetchInitialChannelSet() ?: return
val mqttConfig = moduleConfigRepository.fetchInitialModuleConfig().mqtt

val sslContext = SSLContext.getInstance("TLS")
// Create a custom SSLContext that trusts all certificates
sslContext.init(null, arrayOf<TrustManager>(TrustAllX509TrustManager()), SecureRandom())

// val stat = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + STAT_TOPIC_LEVEL + ownerId
val connectOptions = MqttConnectOptions().apply {
userName = mqttConfig.username
password = mqttConfig.password.toCharArray()
Expand All @@ -60,11 +66,12 @@ class MQTTRepository @Inject constructor(
if (mqttConfig.tlsEnabled) {
socketFactory = sslContext.socketFactory
}
// setWill(stat, "offline".encodeToByteArray(), DEFAULT_QOS, true)
}

val bufferOptions = DisconnectedBufferOptions().apply {
isBufferEnabled = true
bufferSize = 100
bufferSize = 512
isPersistBuffer = false
isDeleteOldestMessages = true
}
Expand All @@ -75,7 +82,8 @@ class MQTTRepository @Inject constructor(

val serverURI: String = URI(scheme, null, host, port, "", "", "").toString()

val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } + VERSION_TOPIC_LEVEL
val topic = mqttConfig.root.ifEmpty { DEFAULT_TOPIC_ROOT } +
DEFAULT_TOPIC_LEVEL // FIXME if (mqttConfig.jsonEnabled) JSON_TOPIC_LEVEL else DEFAULT_TOPIC_LEVEL

mqttClient = MqttAsyncClient(
serverURI,
Expand All @@ -86,8 +94,12 @@ class MQTTRepository @Inject constructor(
setCallback(callback)
setBufferOpts(bufferOptions)
connect(connectOptions).waitForCompletion()
subscribe(topic, DEFAULT_QOS).waitForCompletion()
info("MQTT Subscribed to topic: $topic")

channelSet.subscribeList.forEach { globalId ->
val topicFilter = "$topic$globalId/#"
subscribe(topicFilter, DEFAULT_QOS).waitForCompletion()
info("MQTT Subscribed to topic: $topicFilter")
}
}
}

Expand Down

0 comments on commit 47bc921

Please sign in to comment.