From 9bda2715bb63c6709916879378efc8d241476a39 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 26 Jul 2023 20:55:54 +0800 Subject: [PATCH 1/5] Fix lookup issue for MQTT-5 --- .../mqtt/adapter/MQTTProxyAdapter.java | 31 ++++++++++++++ .../mqtt/messages/ack/MqttPubAck.java | 10 +++++ .../MQTTProxyProtocolMethodProcessor.java | 29 +++++++++++--- .../proxy/PulsarServiceLookupHandler.java | 11 ++--- .../mqtt/support/Qos1PublishHandler.java | 13 ++++++ .../base/MQTTProtocolHandlerTestBase.java | 2 +- .../proxy/MQTT5ProxyIntegrationTest.java | 40 +++++++++++++++++++ 7 files changed, 124 insertions(+), 12 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java index 271e8ea5a..217dfd51c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java @@ -26,17 +26,23 @@ import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.mqtt.MqttDecoder; import io.netty.handler.codec.mqtt.MqttMessage; +import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader; import io.netty.handler.codec.mqtt.MqttMessageType; +import io.netty.handler.codec.mqtt.MqttProperties; +import io.netty.handler.codec.mqtt.MqttPubAckMessage; +import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.netty.handler.codec.mqtt.MqttSubAckMessage; import io.netty.handler.codec.mqtt.MqttUnsubAckMessage; import io.netty.util.concurrent.DefaultThreadFactory; import io.streamnative.pulsar.handlers.mqtt.Connection; +import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor; import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService; import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils; import java.net.InetSocketAddress; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -45,6 +51,7 @@ import java.util.concurrent.atomic.AtomicLong; import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.netty.ChannelFutures; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -211,6 +218,30 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except clientChannel.writeAndFlush(adapterMsg); } break; + case PUBACK: + MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) msg; + MqttMessageIdVariableHeader variableHeader = pubAckMessage.variableHeader(); + if (variableHeader instanceof MqttPubReplyMessageVariableHeader) { + MqttPubReplyMessageVariableHeader header = + (MqttPubReplyMessageVariableHeader) variableHeader; + byte reasonCode = header.reasonCode(); + if (Mqtt5PubReasonCode.UNSPECIFIED_ERROR.byteValue() == reasonCode) { + String sourceTopicName = processor.getPacketIdTopic().get(variableHeader.messageId()); + if (StringUtils.isEmpty(sourceTopicName)) { + MqttProperties.UserProperties property = + (MqttProperties.UserProperties) header.properties() + .getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value()); + if (property != null) { + List pairs = property.value(); + sourceTopicName = pairs.stream().filter(p -> p.key.equals("topicName")) + .map(p -> p.value).findFirst().orElse(""); + } + } + processor.removeTopicBroker(sourceTopicName); + } + } + clientChannel.writeAndFlush(adapterMsg); + break; default: clientChannel.writeAndFlush(adapterMsg); break; diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java index bcd8ce7c8..50d84a627 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/messages/ack/MqttPubAck.java @@ -72,6 +72,8 @@ public final static class MqttPubErrorAckBuilder { private Mqtt5PubReasonCode reasonCode; private String reasonString; + private MqttProperties.UserProperties userProperties; + public MqttPubErrorAckBuilder(int protocolVersion) { this.protocolVersion = protocolVersion; } @@ -81,6 +83,11 @@ public MqttPubAck.MqttPubErrorAckBuilder reasonString(String reasonStr) { return this; } + public MqttPubAck.MqttPubErrorAckBuilder userProperties(MqttProperties.UserProperties userProperties) { + this.userProperties = userProperties; + return this; + } + public MqttPubAck.MqttPubErrorAckBuilder packetId(int packetId) { this.packetId = packetId; return this; @@ -112,6 +119,9 @@ private MqttProperties getProperties() { if (reasonString != null) { MqttPropertyUtils.setReasonString(properties, reasonString); } + if (userProperties != null) { + properties.add(userProperties); + } return properties; } } diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index b18f7d65e..e6204b938 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -56,6 +56,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationDataSource; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -438,12 +439,28 @@ private CompletableFuture connectToBroker(final String topic) { return topicBrokers.computeIfAbsent(topic, key -> lookupHandler.findBroker(TopicName.get(topic)).thenApply(mqttBroker -> adapterChannels.computeIfAbsent(mqttBroker, key1 -> { - AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); - adapterChannel.writeAndFlush(new MqttAdapterMessage( - connection.getClientId(), - connection.getConnectMessage())); - return adapterChannel; - }))); + AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker); + adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(), + connection.getConnectMessage())); + return adapterChannel; + }) + ) + ); + } + + public void removeTopicBroker(String topic) { + if (StringUtils.isNotEmpty(topic)) { + String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(topic, + proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(), + TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain())); + final CompletableFuture brokerChannel = topicBrokers.remove(pulsarTopicName); + if (brokerChannel != null) { + final InetSocketAddress broker = brokerChannel.join().getBroker(); + if (log.isDebugEnabled()) { + log.debug("remove topic : {} from broker : {}", topic, broker); + } + } + } } public AtomicBoolean isDisconnected() { diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java index c3241e536..f0de08985 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/PulsarServiceLookupHandler.java @@ -151,11 +151,12 @@ public CompletableFuture findBroker(TopicName topicName) { private boolean isLookupMQTTBroker(Pair pair, LocalBrokerData localBrokerData) { - String pulsar = String.format("pulsar://%s:%d", pair.getLeft().getHostName(), pair.getLeft().getPort()); - String pulsarSsl = String.format("pulsar+ssl://%s:%d", pair.getLeft().getHostName(), pair.getLeft().getPort()); - return (localBrokerData.getPulsarServiceUrl().equals(pulsar) - || localBrokerData.getPulsarServiceUrlTls().equals(pulsarSsl) - && localBrokerData.getProtocol(protocolHandlerName).isPresent()); + + String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); + String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort()); + return localBrokerData.getProtocol(protocolHandlerName).isPresent() + && (localBrokerData.getPulsarServiceUrl().equals(plain) + || localBrokerData.getPulsarServiceUrlTls().equals(ssl)); } @Override diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java index 9cec6ecc6..8dd1aa589 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/support/Qos1PublishHandler.java @@ -13,6 +13,7 @@ */ package io.streamnative.pulsar.handlers.mqtt.support; +import io.netty.handler.codec.mqtt.MqttProperties; import io.netty.handler.codec.mqtt.MqttPublishMessage; import io.streamnative.pulsar.handlers.mqtt.AbstractQosPublishHandler; import io.streamnative.pulsar.handlers.mqtt.Connection; @@ -94,6 +95,18 @@ public CompletableFuture publish(Connection connection, MqttAdapterMessage pubAckBuilder.reasonString("Topic not found"); } connection.sendAckThenClose(pubAckBuilder.build()); + } else if (realCause instanceof BrokerServiceException.ServiceUnitNotReadyException) { + String errorMsg = String.format("[%s] Publish message fail," + + " because the topic is not served by this broker", topic); + log.error(errorMsg); + MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties(); + userProperties.add("topicName", topic); + MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion) + .packetId(packetId) + .reasonString(errorMsg) + .userProperties(userProperties) + .reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR); + connection.sendAck(pubAckBuilder.build()); } else if (realCause instanceof MQTTTopicAliasNotFoundException) { log.error("[{}] Publish message fail {}, because the topic alias {} not found.", topic, msg, ((MQTTTopicAliasNotFoundException) realCause).getAlias(), ex); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java index e8dc12c15..37112de21 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/base/MQTTProtocolHandlerTestBase.java @@ -141,7 +141,7 @@ protected MQTTCommonConfiguration initConfig() throws Exception{ mqtt.setManagedLedgerCacheSizeMB(8); mqtt.setActiveConsumerFailoverDelayTimeMillis(0); mqtt.setDefaultRetentionTimeInMinutes(7); - mqtt.setDefaultNumberOfNamespaceBundles(1); + mqtt.setDefaultNumberOfNamespaceBundles(4); mqtt.setZookeeperServers("localhost:2181"); mqtt.setConfigurationStoreServers("localhost:3181"); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java index 1286c0a98..424969754 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java @@ -17,9 +17,12 @@ import com.hivemq.client.mqtt.MqttGlobalPublishFilter; import com.hivemq.client.mqtt.datatypes.MqttQos; import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient; +import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException; import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish; +import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult; import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration; import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase; +import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode; import io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base.MQTT5ClientUtils; import java.nio.charset.StandardCharsets; import java.util.Collections; @@ -44,6 +47,43 @@ public MQTTCommonConfiguration initConfig() throws Exception { return conf; } + @Test + public void testBrokerThrowServiceNotReadyException() throws Exception { + Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5ProxyClient( + getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size()))); + client.connect(); + final String topic1 = "topic-1"; + final String ownedBroker = admin.lookups().lookupTopic(topic1); + + final Mqtt5PublishResult r1 = client.publishWith() + .topic(topic1) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("msg1".getBytes(StandardCharsets.UTF_8)) + .send(); + Assert.assertFalse(r1.getError().isPresent()); + while (ownedBroker.equals(admin.lookups().lookupTopic(topic1))) { + admin.namespaces().unload("public/default"); + Thread.sleep(1000); + } + try { + final Mqtt5PublishResult r2 = client.publishWith() + .topic(topic1) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("msg1".getBytes(StandardCharsets.UTF_8)) + .send(); + } catch (Exception ex) { + Assert.assertTrue(ex instanceof Mqtt5PubAckException); + Assert.assertEquals(((Mqtt5PubAckException) ex).getMqttMessage().getReasonCode().getCode() + , Mqtt5PubReasonCode.UNSPECIFIED_ERROR.value()); + } + final Mqtt5PublishResult r3 = client.publishWith() + .topic(topic1) + .qos(MqttQos.AT_LEAST_ONCE) + .payload("msg1".getBytes(StandardCharsets.UTF_8)) + .send(); + Assert.assertFalse(r3.getError().isPresent()); + } + @Test(invocationCount = 2) public void testDynamicUpdateSubscribe() throws InterruptedException, PulsarAdminException { final String topicFilter = "/a/#"; From 39cddc65966216cda1aa189f57a9aafa81ba4177 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 26 Jul 2023 20:58:24 +0800 Subject: [PATCH 2/5] close resource --- .../mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java index 424969754..7ec8fb9ae 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java @@ -82,6 +82,7 @@ public void testBrokerThrowServiceNotReadyException() throws Exception { .payload("msg1".getBytes(StandardCharsets.UTF_8)) .send(); Assert.assertFalse(r3.getError().isPresent()); + client.disconnect(); } @Test(invocationCount = 2) From 23ee989c8090aa7aff38ad0ecdf7848177348406 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 26 Jul 2023 21:42:08 +0800 Subject: [PATCH 3/5] address comment --- .../mqtt/adapter/MQTTProxyAdapter.java | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java index 217dfd51c..edf233c6c 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/adapter/MQTTProxyAdapter.java @@ -51,7 +51,6 @@ import java.util.concurrent.atomic.AtomicLong; import lombok.Getter; import lombok.extern.slf4j.Slf4j; -import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.common.util.netty.ChannelFutures; import org.apache.pulsar.common.util.netty.EventLoopUtil; @@ -226,16 +225,14 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except (MqttPubReplyMessageVariableHeader) variableHeader; byte reasonCode = header.reasonCode(); if (Mqtt5PubReasonCode.UNSPECIFIED_ERROR.byteValue() == reasonCode) { - String sourceTopicName = processor.getPacketIdTopic().get(variableHeader.messageId()); - if (StringUtils.isEmpty(sourceTopicName)) { - MqttProperties.UserProperties property = - (MqttProperties.UserProperties) header.properties() - .getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value()); - if (property != null) { - List pairs = property.value(); - sourceTopicName = pairs.stream().filter(p -> p.key.equals("topicName")) - .map(p -> p.value).findFirst().orElse(""); - } + String sourceTopicName = null; + MqttProperties.UserProperties property = + (MqttProperties.UserProperties) header.properties() + .getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value()); + if (property != null) { + List pairs = property.value(); + sourceTopicName = pairs.stream().filter(p -> p.key.equals("topicName")) + .map(p -> p.value).findFirst().orElse(null); } processor.removeTopicBroker(sourceTopicName); } From 46aca0984cc706102acea0baf2f695b760bd23f0 Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Wed, 26 Jul 2023 21:57:28 +0800 Subject: [PATCH 4/5] add timeout --- .../mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java index 7ec8fb9ae..81941998b 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/mqtt/mqtt5/hivemq/proxy/MQTT5ProxyIntegrationTest.java @@ -47,7 +47,7 @@ public MQTTCommonConfiguration initConfig() throws Exception { return conf; } - @Test + @Test(timeOut = 30_000) public void testBrokerThrowServiceNotReadyException() throws Exception { Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5ProxyClient( getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size()))); From 3423f17a796e40a05edd2058934f20ad865c895a Mon Sep 17 00:00:00 2001 From: Jiwe Guo Date: Thu, 27 Jul 2023 10:26:14 +0800 Subject: [PATCH 5/5] address comment --- .../mqtt/proxy/MQTTProxyProtocolMethodProcessor.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java index e6204b938..4d3c0f722 100644 --- a/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java +++ b/mqtt-impl/src/main/java/io/streamnative/pulsar/handlers/mqtt/proxy/MQTTProxyProtocolMethodProcessor.java @@ -455,10 +455,11 @@ public void removeTopicBroker(String topic) { TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain())); final CompletableFuture brokerChannel = topicBrokers.remove(pulsarTopicName); if (brokerChannel != null) { - final InetSocketAddress broker = brokerChannel.join().getBroker(); - if (log.isDebugEnabled()) { - log.debug("remove topic : {} from broker : {}", topic, broker); - } + brokerChannel.thenAccept(channel -> { + if (log.isDebugEnabled()) { + log.debug("remove topic : {} from broker : {}", topic, channel.getBroker()); + } + }); } } }