From 3822ed22542f34b41e8d0404eac890e710d3a9aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=A9=AC=E6=B5=A9=E5=A4=A9?= <60374114+qy-liuhuo@users.noreply.github.com> Date: Tue, 30 Jul 2024 18:59:11 +0800 Subject: [PATCH] [INLONG-10286][Agent] Update the MQTT Source (#10727) --- .../inlong/agent/constant/TaskConstants.java | 14 ++ .../inlong/agent/pojo/TaskProfileDto.java | 2 + .../agent/plugin/instance/MqttInstance.java | 29 +++ .../agent/plugin/sources/MqttSource.java | 185 ++++++++++++------ .../plugin/sources/reader/MqttReader.java | 39 ++-- .../inlong/agent/plugin/task/MqttTask.java | 103 ++++++++++ .../agent/plugin/sources/TestMqttConnect.java | 14 +- .../agent/plugin/sources/TestMqttReader.java | 13 +- .../agent/plugin/sources/TestMqttSource.java | 3 +- 9 files changed, 306 insertions(+), 96 deletions(-) create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java create mode 100644 inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java index 1607742556a..4cd6ac56ed4 100755 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/constant/TaskConstants.java @@ -139,6 +139,20 @@ public class TaskConstants extends CommonConstants { public static final String TASK_POSTGRES_PLUGIN_NAME = "task.postgreSQLTask.pluginName"; public static final String TASK_POSTGRES_SNAPSHOT_MODE = "task.postgreSQLTask.snapshotMode"; + // MQTT + public static final String TASK_MQTT_USERNAME = "task.mqttTask.userName"; + public static final String TASK_MQTT_PASSWORD = "task.mqttTask.password"; + public static final String TASK_MQTT_SERVER_URI = "task.mqttTask.serverURI"; + public static final String TASK_MQTT_TOPIC = "task.mqttTask.topic"; + public static final String TASK_MQTT_CONNECTION_TIMEOUT = "task.mqttTask.connectionTimeOut"; + public static final String TASK_MQTT_KEEPALIVE_INTERVAL = "task.mqttTask.keepAliveInterval"; + public static final String TASK_MQTT_QOS = "task.mqttTask.qos"; + public static final String TASK_MQTT_CLEAN_SESSION = "task.mqttTask.cleanSession"; + public static final String TASK_MQTT_CLIENT_ID_PREFIX = "task.mqttTask.clientIdPrefix"; + public static final String TASK_MQTT_QUEUE_SIZE = "task.mqttTask.queueSize"; + public static final String TASK_MQTT_AUTOMATIC_RECONNECT = "task.mqttTask.automaticReconnect"; + public static final String TASK_MQTT_VERSION = "task.mqttTask.mqttVersion"; + public static final String TASK_STATE = "task.state"; public static final String INSTANCE_STATE = "instance.state"; diff --git a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java index 1bd806254b2..cc6cfe82446 100644 --- a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java +++ b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/pojo/TaskProfileDto.java @@ -53,6 +53,7 @@ public class TaskProfileDto { public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask"; public static final String DEFAULT_MONGODB_TASK = "org.apache.inlong.agent.plugin.task.MongoDBTask"; public static final String DEFAULT_POSTGRESQL_TASK = "org.apache.inlong.agent.plugin.task.PostgreSQLTask"; + public static final String DEFAULT_MQTT_TASK = "org.apache.inlong.agent.plugin.task.MqttTask"; public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel"; public static final String MANAGER_JOB = "MANAGER_JOB"; public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink"; @@ -513,6 +514,7 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) { profileDto.setTask(task); break; case MQTT: + task.setTaskClass(DEFAULT_MQTT_TASK); MqttTask mqttTask = getMqttTask(dataConfig); task.setMqttTask(mqttTask); task.setSource(MQTT_SOURCE); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java new file mode 100644 index 00000000000..ec4067f4e18 --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/instance/MqttInstance.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.instance; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; + +public class MqttInstance extends CommonInstance { + + @Override + public void setInodeInfo(InstanceProfile profile) { + profile.set(TaskConstants.INODE_INFO, ""); + } +} diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java index a1c4af9be76..144a1e6cc4f 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/MqttSource.java @@ -18,96 +18,171 @@ package org.apache.inlong.agent.plugin.sources; import org.apache.inlong.agent.conf.InstanceProfile; -import org.apache.inlong.agent.conf.TaskProfile; -import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.except.FileException; +import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.plugin.Message; -import org.apache.inlong.agent.plugin.file.Reader; import org.apache.inlong.agent.plugin.sources.file.AbstractSource; -import org.apache.inlong.agent.plugin.sources.reader.MqttReader; -import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttException; +import org.eclipse.paho.client.mqttv3.MqttMessage; +import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; -import java.util.Arrays; +import java.util.HashMap; import java.util.List; -import java.util.Objects; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; public class MqttSource extends AbstractSource { private static final Logger LOGGER = LoggerFactory.getLogger(MqttSource.class); - private static final String JOB_MQTTJOB_PARAM_PREFIX = "job.mqttJob."; + private MqttClient client; + private LinkedBlockingQueue mqttMessagesQueue; + private String serverURI; - private static final String JOB_MQTTJOB_SERVERURI = ""; + private String topic; - private static final String JOB_MQTTJOB_CLIENTID = ""; + private int qos; - public static final String JOB_MQTTJOB_TOPICS = "job.mqttJob.topic"; + private String clientId; + + MqttConnectOptions options; public MqttSource() { } - private List splitSqlJob(String topics, String instanceId) { - if (StringUtils.isEmpty(topics)) { - return null; - } - final List result = new ArrayList<>(); - String[] topicList = topics.split(CommonConstants.COMMA); - if (Objects.nonNull(topicList)) { - Arrays.stream(topicList).forEach(topic -> { - MqttReader mqttReader = new MqttReader(topic); - mqttReader.setReadSource(instanceId); - result.add(mqttReader); - }); - } - return result; + @Override + protected String getThreadName() { + return "mqtt-source-" + taskId + "-" + instanceId; } @Override - public List split(TaskProfile conf) { - String topics = conf.get(JOB_MQTTJOB_TOPICS, StringUtils.EMPTY); - List readerList = null; - if (StringUtils.isNotEmpty(topics)) { - } - if (CollectionUtils.isNotEmpty(readerList)) { - sourceMetric.sourceSuccessCount.incrementAndGet(); - } else { - sourceMetric.sourceFailCount.incrementAndGet(); + protected void initSource(InstanceProfile profile) { + try { + LOGGER.info("MqttSource init: {}", profile.toJsonStr()); + mqttMessagesQueue = new LinkedBlockingQueue<>(profile.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000)); + serverURI = profile.get(TaskConstants.TASK_MQTT_SERVER_URI); + instanceId = profile.getInstanceId(); + topic = profile.get(TaskConstants.TASK_MQTT_TOPIC); + qos = profile.getInt(TaskConstants.TASK_MQTT_QOS, 1); + clientId = profile.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID(); + initConnectOptions(profile); + mqttConnect(); + } catch (Exception e) { + stopRunning(); + throw new FileException("error init stream for {}" + topic, e); } - return readerList; } - @Override - protected String getThreadName() { - return null; + private void initConnectOptions(InstanceProfile profile) { + options = new MqttConnectOptions(); + options.setCleanSession(profile.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false)); + options.setConnectionTimeout(profile.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT, 10)); + options.setKeepAliveInterval(profile.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL, 20)); + options.setUserName(profile.get(TaskConstants.TASK_MQTT_USERNAME, "")); + options.setPassword(profile.get(TaskConstants.TASK_MQTT_PASSWORD, "").toCharArray()); + options.setAutomaticReconnect(profile.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true)); + options.setMqttVersion( + profile.getInt(TaskConstants.TASK_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT)); } - @Override - protected void initSource(InstanceProfile profile) { + private void mqttConnect() { + try { + client = new MqttClient(serverURI, clientId, new MemoryPersistence()); + client.setCallback(new MqttCallback() { + + @Override + public void connectionLost(Throwable cause) { + LOGGER.error("the mqtt jobId:{}, serverURI:{}, connection lost, {} ", instanceId, + serverURI, cause); + reconnect(); + } + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception { + Map headerMap = new HashMap<>(); + headerMap.put("record.topic", topic); + headerMap.put("record.messageId", String.valueOf(message.getId())); + headerMap.put("record.qos", String.valueOf(message.getQos())); + byte[] recordValue = message.getPayload(); + mqttMessagesQueue.offer(new DefaultMessage(recordValue, headerMap), 1, TimeUnit.SECONDS); + + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) { + } + }); + client.connect(options); + client.subscribe(topic, qos); + LOGGER.info("the mqtt subscribe topic is [{}], qos is [{}]", topic, qos); + } catch (Exception e) { + LOGGER.error("init mqtt client error {}. jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI, + clientId); + } + } + private void reconnect() { + if (!client.isConnected()) { + try { + client.connect(options); + LOGGER.info("the mqtt client reconnect success. jobId:{}, serverURI:{}, clientId:{}", instanceId, + serverURI, clientId); + } catch (Exception e) { + LOGGER.error("reconnect mqtt client error {}. jobId:{}, serverURI:{}, clientId:{}", e, instanceId, + serverURI, clientId); + } + } + } + + private void disconnect() { + try { + client.disconnect(); + } catch (MqttException e) { + LOGGER.error("disconnect mqtt client error {}. jobId:{},serverURI:{},clientId:{}", e, instanceId, serverURI, + clientId); + } } @Override protected void printCurrentState() { - + LOGGER.info("mqtt topic is {}", topic); } @Override protected boolean doPrepareToRead() { - return false; + return true; } @Override protected List readFromSource() { - return null; - } - - @Override - public Message read() { - return null; + List dataList = new ArrayList<>(); + try { + int size = 0; + while (size < BATCH_READ_LINE_TOTAL_LEN) { + Message msg = mqttMessagesQueue.poll(1, TimeUnit.SECONDS); + if (msg != null) { + SourceData sourceData = new SourceData(msg.getBody(), "0L"); + size += sourceData.getData().length; + dataList.add(sourceData); + } else { + break; + } + } + } catch (InterruptedException e) { + LOGGER.error("poll {} data from mqtt queue interrupted.", instanceId); + } + return dataList; } @Override @@ -117,16 +192,14 @@ protected boolean isRunnable() { @Override protected void releaseSource() { - - } - - @Override - public boolean sourceFinish() { - return false; + LOGGER.info("release mqtt source"); + if (client.isConnected()) { + disconnect(); + } } @Override public boolean sourceExist() { - return false; + return true; } } diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java index 253328bdced..45660748c26 100644 --- a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/sources/reader/MqttReader.java @@ -18,6 +18,7 @@ package org.apache.inlong.agent.plugin.sources.reader; import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.audit.AuditUtils; import org.apache.inlong.agent.plugin.Message; @@ -43,19 +44,6 @@ public class MqttReader extends AbstractReader { private static final Logger LOGGER = LoggerFactory.getLogger(MqttReader.class); - public static final String JOB_MQTT_USERNAME = "job.mqttJob.userName"; - public static final String JOB_MQTT_PASSWORD = "job.mqttJob.password"; - public static final String JOB_MQTT_SERVER_URI = "job.mqttJob.serverURI"; - public static final String JOB_MQTT_TOPIC = "job.mqttJob.topic"; - public static final String JOB_MQTT_CONNECTION_TIMEOUT = "job.mqttJob.connectionTimeOut"; - public static final String JOB_MQTT_KEEPALIVE_INTERVAL = "job.mqttJob.keepAliveInterval"; - public static final String JOB_MQTT_QOS = "job.mqttJob.qos"; - public static final String JOB_MQTT_CLEAN_SESSION = "job.mqttJob.cleanSession"; - public static final String JOB_MQTT_CLIENT_ID_PREFIX = "job.mqttJob.clientIdPrefix"; - public static final String JOB_MQTT_QUEUE_SIZE = "job.mqttJob.queueSize"; - public static final String JOB_MQTT_AUTOMATIC_RECONNECT = "job.mqttJob.automaticReconnect"; - public static final String JOB_MQTT_VERSION = "job.mqttJob.mqttVersion"; - private boolean finished = false; private boolean destroyed = false; @@ -88,22 +76,20 @@ public MqttReader(String topic) { * @param jobConf */ private void setGlobalParamsValue(InstanceProfile jobConf) { - mqttMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(JOB_MQTT_QUEUE_SIZE, 1000)); + mqttMessagesQueue = new LinkedBlockingQueue<>(jobConf.getInt(TaskConstants.TASK_MQTT_QUEUE_SIZE, 1000)); instanceId = jobConf.getInstanceId(); - userName = jobConf.get(JOB_MQTT_USERNAME); - password = jobConf.get(JOB_MQTT_PASSWORD); - serverURI = jobConf.get(JOB_MQTT_SERVER_URI); - topic = jobConf.get(JOB_MQTT_TOPIC); - clientId = jobConf.get(JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID(); - cleanSession = jobConf.getBoolean(JOB_MQTT_CLEAN_SESSION, false); - automaticReconnect = jobConf.getBoolean(JOB_MQTT_AUTOMATIC_RECONNECT, true); - qos = jobConf.getInt(JOB_MQTT_QOS, 1); - mqttVersion = jobConf.getInt(JOB_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT); - + userName = jobConf.get(TaskConstants.TASK_MQTT_USERNAME); + password = jobConf.get(TaskConstants.TASK_MQTT_PASSWORD); + serverURI = jobConf.get(TaskConstants.TASK_MQTT_SERVER_URI); + clientId = jobConf.get(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, "mqtt_client") + "_" + UUID.randomUUID(); + cleanSession = jobConf.getBoolean(TaskConstants.TASK_MQTT_CLEAN_SESSION, false); + automaticReconnect = jobConf.getBoolean(TaskConstants.TASK_MQTT_AUTOMATIC_RECONNECT, true); + qos = jobConf.getInt(TaskConstants.TASK_MQTT_QOS, 1); + mqttVersion = jobConf.getInt(TaskConstants.TASK_MQTT_VERSION, MqttConnectOptions.MQTT_VERSION_DEFAULT); options = new MqttConnectOptions(); options.setCleanSession(cleanSession); - options.setConnectionTimeout(jobConf.getInt(JOB_MQTT_CONNECTION_TIMEOUT, 10)); - options.setKeepAliveInterval(jobConf.getInt(JOB_MQTT_KEEPALIVE_INTERVAL, 20)); + options.setConnectionTimeout(jobConf.getInt(TaskConstants.TASK_MQTT_CONNECTION_TIMEOUT, 10)); + options.setKeepAliveInterval(jobConf.getInt(TaskConstants.TASK_MQTT_KEEPALIVE_INTERVAL, 20)); options.setUserName(userName); options.setPassword(password.toCharArray()); options.setAutomaticReconnect(automaticReconnect); @@ -114,6 +100,7 @@ private void setGlobalParamsValue(InstanceProfile jobConf) { * connect to MQTT Broker */ private void connect() { + try { synchronized (MqttReader.class) { client = new MqttClient(serverURI, clientId, new MemoryPersistence()); diff --git a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java new file mode 100644 index 00000000000..1d7d9a3dc2f --- /dev/null +++ b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/MqttTask.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.agent.plugin.task; + +import org.apache.inlong.agent.conf.InstanceProfile; +import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.CycleUnitType; +import org.apache.inlong.agent.constant.TaskConstants; +import org.apache.inlong.agent.utils.AgentUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.time.format.DateTimeFormatter; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +public class MqttTask extends AbstractTask { + + private static final Logger LOGGER = LoggerFactory.getLogger(MqttTask.class); + + private String topic; + + private int instanceLimit = DEFAULT_INSTANCE_LIMIT; + + private AtomicBoolean isAdded = new AtomicBoolean(false); + + public static final String DEFAULT_MQTT_INSTANCE = "org.apache.inlong.agent.plugin.instance.MqttInstance"; + + private final DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern("yyyyMMddHH"); + + @Override + public boolean isProfileValid(TaskProfile profile) { + if (!profile.allRequiredKeyExist()) { + LOGGER.info("task profile needs all required key"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_MQTT_TOPIC)) { + LOGGER.info("task profile needs topic"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_MQTT_SERVER_URI)) { + LOGGER.info("task profile needs serverUri"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_MQTT_USERNAME)) { + LOGGER.info("task profile needs username"); + return false; + } + if (!profile.hasKey(TaskConstants.TASK_MQTT_PASSWORD)) { + LOGGER.info("task profile needs password"); + return false; + } + return true; + } + + protected void setInstanceLimit(int instanceLimit) { + this.instanceLimit = instanceLimit; + } + + @Override + protected int getInstanceLimit() { + return instanceLimit; + } + + @Override + protected void initTask() { + LOGGER.info("Mqtt commonInit: {}", taskProfile.toJsonStr()); + topic = taskProfile.get(TaskConstants.TASK_MQTT_TOPIC); + } + + @Override + protected List getNewInstanceList() { + List list = new ArrayList<>(); + if (isAdded.get()) { + return list; + } + String dataTime = LocalDateTime.now().format(dateTimeFormatter); + InstanceProfile instanceProfile = taskProfile.createInstanceProfile(DEFAULT_MQTT_INSTANCE, topic, + CycleUnitType.HOUR, dataTime, AgentUtils.getCurrentTime()); + LOGGER.info("taskProfile.createInstanceProfile(mqtt): {}", instanceProfile.toJsonStr()); + list.add(instanceProfile); + isAdded.set(true); + return list; + } +} diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java index 89aa196ac23..8ca9785e6d6 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttConnect.java @@ -18,9 +18,9 @@ package org.apache.inlong.agent.plugin.sources; import org.apache.inlong.agent.conf.TaskProfile; +import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.plugin.Message; import org.apache.inlong.agent.plugin.file.Reader; -import org.apache.inlong.agent.plugin.sources.reader.MqttReader; import org.apache.inlong.agent.utils.AgentUtils; import org.junit.Ignore; @@ -45,12 +45,12 @@ public class TestMqttConnect { @Ignore public void testMqttReader() throws Exception { TaskProfile jobProfile = TaskProfile.parseJsonStr("{}"); - jobProfile.set(MqttReader.JOB_MQTT_SERVER_URI, "tcp://broker.hivemq.com:1883"); - jobProfile.set(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX, "mqtt_client"); - jobProfile.set(MqttReader.JOB_MQTT_USERNAME, "test"); - jobProfile.set(MqttReader.JOB_MQTT_PASSWORD, "test"); - jobProfile.set(MqttSource.JOB_MQTTJOB_TOPICS, "testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2"); - jobProfile.set(MqttReader.JOB_MQTT_QOS, "0"); + jobProfile.set(TaskConstants.TASK_MQTT_SERVER_URI, "tcp://broker.hivemq.com:1883"); + jobProfile.set(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX, "mqtt_client"); + jobProfile.set(TaskConstants.TASK_MQTT_USERNAME, "test"); + jobProfile.set(TaskConstants.TASK_MQTT_PASSWORD, "test"); + jobProfile.set(TaskConstants.TASK_MQTT_TOPIC, "testtopic/mqtt/p1/ebr/delivered,testtopic/NARTU2"); + jobProfile.set(TaskConstants.TASK_MQTT_QOS, "0"); jobProfile.set("job.instance.id", "_1"); final MqttSource source = new MqttSource(); diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java index 2caadd5c9ee..a2652869bae 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttReader.java @@ -19,6 +19,7 @@ import org.apache.inlong.agent.conf.InstanceProfile; import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.message.DefaultMessage; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; @@ -106,13 +107,13 @@ public void setUp() throws Exception { when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn(groupId); when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn(streamId); - when(jobProfile.get(eq(MqttReader.JOB_MQTT_USERNAME))).thenReturn(username); - when(jobProfile.get(eq(MqttReader.JOB_MQTT_PASSWORD))).thenReturn(password); - when(jobProfile.get(eq(MqttReader.JOB_MQTT_SERVER_URI))).thenReturn(serverURI); - when(jobProfile.get(eq(MqttReader.JOB_MQTT_QOS))).thenReturn(qos); - when(jobProfile.get(eq(MqttReader.JOB_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix); + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_USERNAME))).thenReturn(username); + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_PASSWORD))).thenReturn(password); + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_SERVER_URI))).thenReturn(serverURI); + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_QOS))).thenReturn(qos); + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_CLIENT_ID_PREFIX))).thenReturn(clientIdPrefix); when(jobProfile.getInstanceId()).thenReturn(INSTANCE_ID); - when(jobProfile.getInt(eq(MqttReader.JOB_MQTT_QUEUE_SIZE), eq(1000))).thenReturn(1000); + when(jobProfile.getInt(eq(TaskConstants.TASK_MQTT_QUEUE_SIZE), eq(1000))).thenReturn(1000); // mock MqttClient whenNew(MqttClient.class).withArguments(anyString(), anyString(), any(MemoryPersistence.class)) diff --git a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java index a956c07aacb..aeb178cb331 100644 --- a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java +++ b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/sources/TestMqttSource.java @@ -19,6 +19,7 @@ import org.apache.inlong.agent.conf.TaskProfile; import org.apache.inlong.agent.constant.CommonConstants; +import org.apache.inlong.agent.constant.TaskConstants; import org.apache.inlong.agent.metrics.AgentMetricItem; import org.apache.inlong.agent.metrics.AgentMetricItemSet; import org.apache.inlong.common.metric.MetricItem; @@ -90,7 +91,7 @@ public void testSplit() { // build mock when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_GROUP_ID), anyString())).thenReturn("test_group"); when(jobProfile.get(eq(CommonConstants.PROXY_INLONG_STREAM_ID), anyString())).thenReturn("test_stream"); - when(jobProfile.get(eq(MqttSource.JOB_MQTTJOB_TOPICS), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY, + when(jobProfile.get(eq(TaskConstants.TASK_MQTT_TOPIC), eq(StringUtils.EMPTY))).thenReturn(StringUtils.EMPTY, topic1, topic2); final MqttSource source = new MqttSource();