Skip to content

Commit

Permalink
1.2.0 新增 默认回调支持自动重新订阅之前订阅的主题
Browse files Browse the repository at this point in the history
  • Loading branch information
sliverTwo committed Nov 29, 2020
1 parent 3965876 commit 114c98c
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 47 deletions.
39 changes: 28 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jfinal-mqtt-plugin是jfinal的mqtt消息插件插件,目前基于eclipse开源
<dependency>
<groupId>com.iipcloud</groupId>
<artifactId>jfinal-mqtt-plugin</artifactId>
<version>1.0.0</version>
<version>1.2.0</version>
</dependency>
```

Expand Down Expand Up @@ -56,7 +56,7 @@ mqtt.sslProperties=
#MQTT 消息保存方式 如果不设置,默认保存在内存中,设置了则保存着指定的目录下
mqtt.stroageDir=
# 启用默认的全局回调
# 启用默认的全局回调,用于支持连接断开重连后的主题订阅恢复
mqtt.enableDefaultCallback = false
# 同时发送的最大消息数
mqtt.maxInflight = 1000
Expand All @@ -80,14 +80,31 @@ mqtt.maxInflight = 1000

### 参数说明

|参数名 |类型 |说明 |
|-- |-- |-- |
|topic |String |发布或订阅的消息主题 |
|qos |int |消息的质量 MqttKit.QOS_AT_MOST_ONCE(最多一次) QOS_AT_LEAST_ONCE(最少一次) QOS_EXACTLY_ONCE(只有一次) 注:消息质量取的是发布者和订阅者中最低的一个 |
|messageListener|IMqttMessageListener |收到指定主题消息后的回调 |
|paylod |byte或者Kv |发送的消息内容 |
|retained |boolean |是否持久化 如果设为true 服务器会将该消息发送给当前的订阅者,还会将这个消息推送给新订阅这个题注的订阅者 |
|timeout |long | 超时时间 |
|message |MqttMessage |消息体 |
| 参数名 | 类型 | 说明 |
| --------------- | -------------------- | ------------------------------------------------------------ |
| topic | String | 发布或订阅的消息主题 |
| qos | int | 消息的质量 MqttKit.QOS_AT_MOST_ONCE(最多一次) QOS_AT_LEAST_ONCE(最少一次) QOS_EXACTLY_ONCE(只有一次) 注:消息质量取的是发布者和订阅者中最低的一个 |
| messageListener | IMqttMessageListener | 收到指定主题消息后的回调 |
| paylod | byte\|Kv | 发送的消息内容 |
| retained | boolean | 是否持久化 如果设为true 服务器会将该消息发送给当前的订阅者,还会将这个消息推送给新订阅这个题注的订阅者 |
| timeout | long | 超时时间 |
| message | MqttMessage | 消息体 |



## 注意事项

1. 回调事件为同步回调,不要在收到消息的回调中发布消息,否则会导致死锁,如需发布消息,请新开一个线程.
2. 回调事件中抛出异常会导致连接断开.



## 更新日志

### 20201129 1.2.0

* 新增 默认回调支持自动重新订阅之前订阅的主题

### 20201021 1.1.0

* 添加对emq共享主题的支持
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>com.iipcloud</groupId>
<artifactId>jfinal-mqtt-plugin</artifactId>
<version>1.1.1</version>
<version>1.2.0</version>
<packaging>jar</packaging>

<name>jfinal-mqtt-plugin</name>
Expand Down
56 changes: 35 additions & 21 deletions src/main/java/com/iipcloud/jfinal/plugin/mqtt/DefaultCallback.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,48 +3,62 @@
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttAsyncClient;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;

import com.iipcloud.jfinal.plugin.mqtt.MqttPro.ListenerWrapper;
import com.jfinal.log.Log;

public class DefaultCallback implements MqttCallback {
private static Log logger = Log.getLog(DefaultCallback.class);
private MqttAsyncClient client;
private MqttPro mqttPro;

public DefaultCallback(MqttAsyncClient client) {
public DefaultCallback(MqttPro mqttPro) {
super();
this.client = client;
this.mqttPro = mqttPro;
}

@Override
public void connectionLost(Throwable cause) {
// 连接丢失后,一般在这里面进行重连
logger.info("[MQTT] 连接断开,10S之后尝试重连...");
// 打印连接原因
cause.printStackTrace();
for (int i = 1; i < 1000; i++) {
try {
Thread.sleep(10000);
if (client.isConnected()) {
break;
new Thread(() -> {
logger.info("[MQTT] 连接断开,1s之后尝试重连...", cause);
MqttAsyncClient client = mqttPro.getClient();
boolean reconnecting = false;
for (int i = 1; i < 1000; i++) {
try {
if (client.isConnected()) {
break;
}
Thread.sleep(1000);
boolean needReconnect = !mqttPro.config.isAutomaticReconnection() && !reconnecting && !client.isConnected();
if (needReconnect) {
logger.info("开始重连...");
client.reconnect();
reconnecting = true;
}
} catch (Exception e) {
logger.info("mqtt重连失败,继续重连,reason:" + e.getMessage(), e);
continue;
}
logger.info("mqtt第" + i + "次重连开始...");
client.reconnect();
break;
} catch (Exception e) {
logger.info("mqtt重连失败,继续重连");
e.printStackTrace();
continue;
}
}
logger.info("mqtt重连成功");
reconnecting = false;
logger.info("开始重新订阅主题");
for (ListenerWrapper l : mqttPro.callbacks.values()) {
try {
mqttPro.subscribe(l.getTopic(), l.getQos(), l.getListener(), l.getTimeout());
} catch (MqttException e) {
logger.error("主题订阅失败,topic:%s,reason:%s", l.getTopic(), e.getMessage(), e);
}
}
logger.info("主题重新订阅完成");
}).start();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
logger.info("主题:" + topic);
logger.info("message:" + message);

}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/iipcloud/jfinal/plugin/mqtt/MqttKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ static void init(String configName, MqttPro mqttPro) {
public static MqttPro use(String configName) {
MqttPro mqttPro = proMap.get(configName);
if (mqttPro == null) {
throw new RuntimeException(configName + "配置的Mail不存在!");
throw new RuntimeException(configName + "配置的mqtt client不存在!");
}
return mqttPro;
}
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/iipcloud/jfinal/plugin/mqtt/MqttPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,10 @@ public boolean stop() {
* @param topic
* @param payload
* @param qos
* @param retainedd
* @param retained
*/
public void setWill(String topic, String payload, int qos, boolean retainedd) {
this.mqttPro.setWill(topic, payload, qos, retainedd);
public void setWill(String topic, String payload, int qos, boolean retained) {
this.mqttPro.setWill(topic, payload, qos, retained);
}

/**
Expand Down
63 changes: 59 additions & 4 deletions src/main/java/com/iipcloud/jfinal/plugin/mqtt/MqttPro.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
*/
package com.iipcloud.jfinal.plugin.mqtt;

import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;

import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
Expand Down Expand Up @@ -37,9 +39,9 @@
* @date 2019年6月6日
*/
public class MqttPro {

private static Log logger = Log.getLog(MqttPro.class);
private MqttConfig config;
Map<String,ListenerWrapper> callbacks;
MqttConfig config;
/** scheduler 自动重连的任务调度器 */
private ScheduledExecutorService scheduler;
/** client 异步MQTT Client */
Expand Down Expand Up @@ -98,6 +100,7 @@ public boolean subscribe(String topic, int qos, IMqttMessageListener messageList
String pureTopic = getPureTopic(topic);
return subscribe(pureTopic, qos, messageListener, timeout);
}
callbacks.put(topic, new ListenerWrapper(topic, qos, messageListener, timeout));
return true;
} else {
return false;
Expand Down Expand Up @@ -146,7 +149,11 @@ private static boolean isEmqShareTopic(String topic) {
public boolean unsubscribe(String topic) throws MqttException {
IMqttToken token = client.unsubscribe(topic);
token.waitForCompletion();
return token.isComplete();
boolean complete = token.isComplete();
if(complete) {
callbacks.remove(topic);
}
return complete;
}

/**
Expand Down Expand Up @@ -231,7 +238,7 @@ public boolean start() {
this.client.setManualAcks(this.config.isManualAcks());
// 启用默认回调
if (this.config.isEnableDefaultCallback()) {
this.client.setCallback(new DefaultCallback(this.client));
this.client.setCallback(new DefaultCallback(this));
}
this.options = new MqttConnectOptions();
// 自动重连设置
Expand Down Expand Up @@ -288,6 +295,7 @@ public boolean start() {
LogKit.error("MQTT Clinet连接MQTT Broker连接失败", e);
return false;
}
callbacks = new ConcurrentHashMap<>();
return true;
}

Expand Down Expand Up @@ -342,5 +350,52 @@ public MqttAsyncClient getClient() {
public void setCallback(MqttCallback callback) {
this.client.setCallback(callback);
}

public MqttConfig getConfig() {
return config;
}

static class ListenerWrapper{
private String topic;
private int qos;
private long timeout;
private IMqttMessageListener listener;

public ListenerWrapper(String topic, int qos, IMqttMessageListener listener, long timeout) {
super();
this.topic = topic;
this.qos = qos;
this.listener = listener;
this.timeout = timeout;
}
public String getTopic() {
return topic;
}
public int getQos() {
return qos;
}
public ListenerWrapper setQos(int qos) {
this.qos = qos;
return this;
}
public long getTimeout() {
return timeout;
}
public ListenerWrapper setTimeout(long timeout) {
this.timeout = timeout;
return this;
}
public IMqttMessageListener getListener() {
return listener;
}
public ListenerWrapper setListener(IMqttMessageListener listener) {
this.listener = listener;
return this;
}
public ListenerWrapper setTopic(String topic) {
this.topic = topic;
return this;
}

}
}
4 changes: 2 additions & 2 deletions src/main/resources/mqtt.properties
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#JFinal-mqtt\u63d2\u4ef6\u914d\u7f6e\u6587\u4ef6

#MQTT Broker\u8fde\u63a5\u5730\u5740 \u9ed8\u8ba4\u5730\u5740:tcp://127.0.0.1:1883
mqtt.brokerURL=tcp://192.168.0.188:1883
mqtt.brokerURL=tcp://192.168.0.155:1883
#mqtt.brokerURL=tcp://127.0.0.1:1883

#MQTT ClientId \u9ed8\u8ba4\u503c:"jf_mq_p_"+System.nanoTime()
Expand All @@ -22,7 +22,7 @@ mqtt.automaticReconnection=true
mqtt.reConnectionTimeInterval=5

#\u662f\u5426\u6e05\u7406session\uff0cfalse\u65f6\u53ef\u63a5\u6536\u79bb\u7ebf\u6d88\u606f true\u5ffd\u7565\u79bb\u7ebf\u6d88\u606f \u9ed8\u8ba4true
mqtt.cleanSession=true
mqtt.cleanSession=false

#\u8fde\u63a5MQTT Broker\u8d85\u65f6\u65f6\u95f4 \u9ed8\u8ba4:30s
mqtt.connectionTimeout=60
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*/
package com.iipcloud.jfinal.plugin.mqtt.test;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
Expand Down Expand Up @@ -41,22 +42,27 @@ public static void start() {
MqttPlugin mqttPlugin = new MqttPlugin("mqtt.properties");

plugins.add(mqttPlugin);
for (int i = 0; i < 10; i++) {
plugins.add(new MqttPlugin(String.valueOf(i), "mqtt.properties"));
}
/*
* for (int i = 0; i < 10; i++) {
* plugins.add(new MqttPlugin(String.valueOf(i), "mqtt.properties"));
* }
*/
for (IPlugin plugin : plugins.getPluginList()) {
plugin.start();
}
}

@Test
public void test() throws MqttException {
public void test() throws MqttException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
MqttKit.subscribe("$queue/test", 0, new IMqttMessageListener() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println(message);
throw new Exception("10086");
}
});
latch.await();
}

public static void main(String[] args) {
Expand Down

0 comments on commit 114c98c

Please sign in to comment.