Skip to content

Commit

Permalink
[INLONG-10286][Agent] Update the MQTT Source (apache#10727)
Browse files Browse the repository at this point in the history
  • Loading branch information
qy-liuhuo authored Jul 30, 2024
1 parent 381cbe5 commit 3822ed2
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,20 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_POSTGRES_PLUGIN_NAME = "task.postgreSQLTask.pluginName";
public static final String TASK_POSTGRES_SNAPSHOT_MODE = "task.postgreSQLTask.snapshotMode";

// MQTT
public static final String TASK_MQTT_USERNAME = "task.mqttTask.userName";
public static final String TASK_MQTT_PASSWORD = "task.mqttTask.password";
public static final String TASK_MQTT_SERVER_URI = "task.mqttTask.serverURI";
public static final String TASK_MQTT_TOPIC = "task.mqttTask.topic";
public static final String TASK_MQTT_CONNECTION_TIMEOUT = "task.mqttTask.connectionTimeOut";
public static final String TASK_MQTT_KEEPALIVE_INTERVAL = "task.mqttTask.keepAliveInterval";
public static final String TASK_MQTT_QOS = "task.mqttTask.qos";
public static final String TASK_MQTT_CLEAN_SESSION = "task.mqttTask.cleanSession";
public static final String TASK_MQTT_CLIENT_ID_PREFIX = "task.mqttTask.clientIdPrefix";
public static final String TASK_MQTT_QUEUE_SIZE = "task.mqttTask.queueSize";
public static final String TASK_MQTT_AUTOMATIC_RECONNECT = "task.mqttTask.automaticReconnect";
public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion";

public static final String TASK_STATE = "task.state";

public static final String INSTANCE_STATE = "instance.state";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public class TaskProfileDto {
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask";
public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask";
public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
Expand Down Expand Up @@ -513,6 +514,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
profileDto.setTask(task);
break;
case MQTT:
task.setTaskClass(DEFAULT_MQTT_TASK);
MqttTask mqttTask = getMqttTask(dataConfig);
task.setMqttTask(mqttTask);
task.setSource(MQTT_SOURCE);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.plugin.instance;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;

public class MqttInstance extends CommonInstance {

@Override
public void setInodeInfo(InstanceProfile profile) {
profile.set(TaskConstants.INODE_INFO, "");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,96 +18,171 @@
package org.apache.inlong.agent.plugin.sources;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.except.FileException;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.plugin.Message;
import org.apache.inlong.agent.plugin.file.Reader;
import org.apache.inlong.agent.plugin.sources.file.AbstractSource;
import org.apache.inlong.agent.plugin.sources.reader.MqttReader;

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

public class MqttSource extends AbstractSource {

private static final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class);

private static final String JOB_MQTTJOB_PARAM_PREFIX = "job.mqttJob.";
private MqttClient client;
private LinkedBlockingQueue<DefaultMessage> mqttMessagesQueue;
private String serverURI;

private static final String JOB_MQTTJOB_SERVERURI = "";
private String topic;

private static final String JOB_MQTTJOB_CLIENTID = "";
private int qos;

public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic";
private String clientId;

MqttConnectOptions options;

public MqttSource() {
}

private List<Reader> splitSqlJob(String topics, String instanceId) {
if (StringUtils.isEmpty(topics)) {
return null;
}
final List<Reader> result = new ArrayList<>();
String[] topicList = topics.split(CommonConstants.COMMA);
if (Objects.nonNull(topicList)) {
Arrays.stream(topicList).forEach(topic -> {
MqttReader mqttReader = new MqttReader(topic);
mqttReader.setReadSource(instanceId);
result.add(mqttReader);
});
}
return result;
@Override
protected String getThreadName() {
return "mqtt-source-" + taskId + "-" + instanceId;
}

@Override
public List<Reader> split(TaskProfile conf) {
String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY);
List<Reader> readerList = null;
if (StringUtils.isNotEmpty(topics)) {
}
if (CollectionUtils.isNotEmpty(readerList)) {
sourceMetric.sourceSuccessCount.incrementAndGet();
} else {
sourceMetric.sourceFailCount.incrementAndGet();
protected void initSource(InstanceProfile profile) {
try {
LOGGER.info("MqttSource init: {}", profile.toJsonStr());
mqttMessagesQueue = new LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
serverURI = profile.get(TaskConstants.TASK_MQTT_SERVER_URI);
instanceId = profile.getInstanceId();
topic = profile.get(TaskConstants.TASK_MQTT_TOPIC);
qos = profile.getInt(TaskConstants.TASK_MQTT_QOS, 1);
clientId = profile.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID();
initConnectOptions(profile);
mqttConnect();
} catch (Exception e) {
stopRunning();
throw new FileException("error init stream for {}" + topic, e);
}
return readerList;
}

@Override
protected String getThreadName() {
return null;
private void initConnectOptions(InstanceProfile profile) {
options = new MqttConnectOptions();
options.setCleanSession(profile.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false));
options.setConnectionTimeout(profile.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT, 10));
options.setKeepAliveInterval(profile.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL, 20));
options.setUserName(profile.get(TaskConstants.TASK_MQTT_USERNAME, ""));
options.setPassword(profile.get(TaskConstants.TASK_MQTT_PASSWORD, "").toCharArray());
options.setAutomaticReconnect(profile.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true));
options.setMqttVersion(
profile.getInt(TaskConstants.TASK_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT));
}

@Override
protected void initSource(InstanceProfile profile) {
private void mqttConnect() {
try {
client = new MqttClient(serverURI, clientId, new MemoryPersistence());
client.setCallback(new MqttCallback() {

@Override
public void connectionLost(Throwable cause) {
LOGGER.error("the mqtt jobId:{}, serverURI:{}, connection lost, {} ", instanceId,
serverURI, cause);
reconnect();
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
Map<String, String> headerMap = new HashMap<>();
headerMap.put("record.topic", topic);
headerMap.put("record.messageId", String.valueOf(message.getId()));
headerMap.put("record.qos", String.valueOf(message.getQos()));
byte[] recordValue = message.getPayload();
mqttMessagesQueue.offer(new DefaultMessage(recordValue, headerMap), 1, TimeUnit.SECONDS);

}

@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect(options);
client.subscribe(topic, qos);
LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]", topic, qos);
} catch (Exception e) {
LOGGER.error("init mqtt client error {}. jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
clientId);
}
}

private void reconnect() {
if (!client.isConnected()) {
try {
client.connect(options);
LOGGER.info("the mqtt client reconnect success. jobId:{}, serverURI:{}, clientId:{}", instanceId,
serverURI, clientId);
} catch (Exception e) {
LOGGER.error("reconnect mqtt client error {}. jobId:{}, serverURI:{}, clientId:{}", e, instanceId,
serverURI, clientId);
}
}
}

private void disconnect() {
try {
client.disconnect();
} catch (MqttException e) {
LOGGER.error("disconnect mqtt client error {}. jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI,
clientId);
}
}

@Override
protected void printCurrentState() {

LOGGER.info("mqtt topic is {}", topic);
}

@Override
protected boolean doPrepareToRead() {
return false;
return true;
}

@Override
protected List<SourceData> readFromSource() {
return null;
}

@Override
public Message read() {
return null;
List<SourceData> dataList = new ArrayList<>();
try {
int size = 0;
while (size < BATCH_READ_LINE_TOTAL_LEN) {
Message msg = mqttMessagesQueue.poll(1, TimeUnit.SECONDS);
if (msg != null) {
SourceData sourceData = new SourceData(msg.getBody(), "0L");
size += sourceData.getData().length;
dataList.add(sourceData);
} else {
break;
}
}
} catch (InterruptedException e) {
LOGGER.error("poll {} data from mqtt queue interrupted.", instanceId);
}
return dataList;
}

@Override
Expand All @@ -117,16 +192,14 @@ protected boolean isRunnable() {

@Override
protected void releaseSource() {

}

@Override
public boolean sourceFinish() {
return false;
LOGGER.info("release mqtt source");
if (client.isConnected()) {
disconnect();
}
}

@Override
public boolean sourceExist() {
return false;
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.inlong.agent.plugin.sources.reader;

import org.apache.inlong.agent.conf.InstanceProfile;
import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.message.DefaultMessage;
import org.apache.inlong.agent.metrics.audit.AuditUtils;
import org.apache.inlong.agent.plugin.Message;
Expand All @@ -43,19 +44,6 @@ public class MqttReader extends AbstractReader {

private static final Logger LOGGER = LoggerFactory.getLogger(MqttReader.class);

public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName";
public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password";
public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI";
public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic";
public static final String JOB_MQTT_CONNECTION_TIMEOUT = "job.mqttJob.connectionTimeOut";
public static final String JOB_MQTT_KEEPALIVE_INTERVAL = "job.mqttJob.keepAliveInterval";
public static final String JOB_MQTT_QOS = "job.mqttJob.qos";
public static final String JOB_MQTT_CLEAN_SESSION = "job.mqttJob.cleanSession";
public static final String JOB_MQTT_CLIENT_ID_PREFIX = "job.mqttJob.clientIdPrefix";
public static final String JOB_MQTT_QUEUE_SIZE = "job.mqttJob.queueSize";
public static final String JOB_MQTT_AUTOMATIC_RECONNECT = "job.mqttJob.automaticReconnect";
public static final String JOB_MQTT_VERSION = "job.mqttJob.mqttVersion";

private boolean finished = false;

private boolean destroyed = false;
Expand Down Expand Up @@ -88,22 +76,20 @@ public MqttReader(String topic) {
* @param jobConf
*/
private void setGlobalParamsValue(InstanceProfile jobConf) {
mqttMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_MQTT_QUEUE_SIZE, 1000));
mqttMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000));
instanceId = jobConf.getInstanceId();
userName = jobConf.get(JOB_MQTT_USERNAME);
password = jobConf.get(JOB_MQTT_PASSWORD);
serverURI = jobConf.get(JOB_MQTT_SERVER_URI);
topic = jobConf.get(JOB_MQTT_TOPIC);
clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID();
cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false);
automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT, true);
qos = jobConf.getInt(JOB_MQTT_QOS, 1);
mqttVersion = jobConf.getInt(JOB_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT);

userName = jobConf.get(TaskConstants.TASK_MQTT_USERNAME);
password = jobConf.get(TaskConstants.TASK_MQTT_PASSWORD);
serverURI = jobConf.get(TaskConstants.TASK_MQTT_SERVER_URI);
clientId = jobConf.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID();
cleanSession = jobConf.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false);
automaticReconnect = jobConf.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true);
qos = jobConf.getInt(TaskConstants.TASK_MQTT_QOS, 1);
mqttVersion = jobConf.getInt(TaskConstants.TASK_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT);
options = new MqttConnectOptions();
options.setCleanSession(cleanSession);
options.setConnectionTimeout(jobConf.getInt(JOB_MQTT_CONNECTION_TIMEOUT, 10));
options.setKeepAliveInterval(jobConf.getInt(JOB_MQTT_KEEPALIVE_INTERVAL, 20));
options.setConnectionTimeout(jobConf.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT, 10));
options.setKeepAliveInterval(jobConf.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL, 20));
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(automaticReconnect);
Expand All @@ -114,6 +100,7 @@ private void setGlobalParamsValue(InstanceProfile jobConf) {
* connect to MQTT Broker
*/
private void connect() {

try {
synchronized (MqttReader.class) {
client = new MqttClient(serverURI, clientId, new MemoryPersistence());
Expand Down
Loading

0 comments on commit 3822ed2

Please sign in to comment.