Skip to content

Commit

Permalink
Fix lookup issue for MQTT-5 (#1040)
Browse files Browse the repository at this point in the history
  • Loading branch information
Technoboy- committed Jul 27, 2023
1 parent 6c34758 commit 49f9fbf
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,23 @@
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPubAckMessage;
import io.netty.handler.codec.mqtt.MqttPubReplyMessageVariableHeader;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.codec.mqtt.MqttSubAckMessage;
import io.netty.handler.codec.mqtt.MqttUnsubAckMessage;
import io.netty.util.concurrent.DefaultThreadFactory;
import io.streamnative.pulsar.handlers.mqtt.Connection;
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyProtocolMethodProcessor;
import io.streamnative.pulsar.handlers.mqtt.proxy.MQTTProxyService;
import io.streamnative.pulsar.handlers.mqtt.utils.MqttUtils;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -211,6 +217,28 @@ public void channelRead(ChannelHandlerContext ctx, Object message) throws Except
clientChannel.writeAndFlush(adapterMsg);
}
break;
case PUBACK:
MqttPubAckMessage pubAckMessage = (MqttPubAckMessage) msg;
MqttMessageIdVariableHeader variableHeader = pubAckMessage.variableHeader();
if (variableHeader instanceof MqttPubReplyMessageVariableHeader) {
MqttPubReplyMessageVariableHeader header =
(MqttPubReplyMessageVariableHeader) variableHeader;
byte reasonCode = header.reasonCode();
if (Mqtt5PubReasonCode.UNSPECIFIED_ERROR.byteValue() == reasonCode) {
String sourceTopicName = null;
MqttProperties.UserProperties property =
(MqttProperties.UserProperties) header.properties()
.getProperty(MqttProperties.MqttPropertyType.USER_PROPERTY.value());
if (property != null) {
List<MqttProperties.StringPair> pairs = property.value();
sourceTopicName = pairs.stream().filter(p -> p.key.equals("topicName"))
.map(p -> p.value).findFirst().orElse(null);
}
processor.removeTopicBroker(sourceTopicName);
}
}
clientChannel.writeAndFlush(adapterMsg);
break;
default:
clientChannel.writeAndFlush(adapterMsg);
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public final static class MqttPubErrorAckBuilder {
private Mqtt5PubReasonCode reasonCode;
private String reasonString;

private MqttProperties.UserProperties userProperties;

public MqttPubErrorAckBuilder(int protocolVersion) {
this.protocolVersion = protocolVersion;
}
Expand All @@ -81,6 +83,11 @@ public MqttPubAck.MqttPubErrorAckBuilder reasonString(String reasonStr) {
return this;
}

public MqttPubAck.MqttPubErrorAckBuilder userProperties(MqttProperties.UserProperties userProperties) {
this.userProperties = userProperties;
return this;
}

public MqttPubAck.MqttPubErrorAckBuilder packetId(int packetId) {
this.packetId = packetId;
return this;
Expand Down Expand Up @@ -112,6 +119,9 @@ private MqttProperties getProperties() {
if (reasonString != null) {
MqttPropertyUtils.setReasonString(properties, reasonString);
}
if (userProperties != null) {
properties.add(userProperties);
}
return properties;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.naming.TopicDomain;
Expand Down Expand Up @@ -435,12 +436,29 @@ private CompletableFuture<AdapterChannel> connectToBroker(final String topic) {
return topicBrokers.computeIfAbsent(topic,
key -> lookupHandler.findBroker(TopicName.get(topic)).thenApply(mqttBroker ->
adapterChannels.computeIfAbsent(mqttBroker, key1 -> {
AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker);
adapterChannel.writeAndFlush(new MqttAdapterMessage(
connection.getClientId(),
connection.getConnectMessage()));
return adapterChannel;
})));
AdapterChannel adapterChannel = proxyAdapter.getAdapterChannel(mqttBroker);
adapterChannel.writeAndFlush(new MqttAdapterMessage(connection.getClientId(),
connection.getConnectMessage()));
return adapterChannel;
})
)
);
}

public void removeTopicBroker(String topic) {
if (StringUtils.isNotEmpty(topic)) {
String pulsarTopicName = PulsarTopicUtils.getEncodedPulsarTopicName(topic,
proxyConfig.getDefaultTenant(), proxyConfig.getDefaultNamespace(),
TopicDomain.getEnum(proxyConfig.getDefaultTopicDomain()));
final CompletableFuture<AdapterChannel> brokerChannel = topicBrokers.remove(pulsarTopicName);
if (brokerChannel != null) {
brokerChannel.thenAccept(channel -> {
if (log.isDebugEnabled()) {
log.debug("remove topic : {} from broker : {}", topic, channel.getBroker());
}
});
}
}
}

public AtomicBoolean isDisconnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,11 @@ && isLookupMQTTBroker(lookupPair, brokerData.get()))

private boolean isLookupMQTTBroker(Pair<InetSocketAddress, InetSocketAddress> pair,
LocalBrokerData localBrokerData) {
return (localBrokerData.getPulsarServiceUrl().equals("pulsar://" + pair.getLeft().toString())
|| localBrokerData.getPulsarServiceUrlTls().equals("pulsar+ssl://" + pair.getLeft().toString()))
&& localBrokerData.getProtocol(protocolHandlerName).isPresent();
String plain = String.format("pulsar://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
String ssl = String.format("pulsar+ssl://%s:%s", pair.getLeft().getHostName(), pair.getLeft().getPort());
return localBrokerData.getProtocol(protocolHandlerName).isPresent()
&& (localBrokerData.getPulsarServiceUrl().equals(plain)
|| localBrokerData.getPulsarServiceUrlTls().equals(ssl));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.mqtt.support;

import io.netty.handler.codec.mqtt.MqttProperties;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.streamnative.pulsar.handlers.mqtt.AbstractQosPublishHandler;
import io.streamnative.pulsar.handlers.mqtt.Connection;
Expand Down Expand Up @@ -92,6 +93,18 @@ public CompletableFuture<Void> publish(Connection connection, MqttAdapterMessage
pubAckBuilder.reasonString("Topic not found");
}
connection.sendAckThenClose(pubAckBuilder.build());
} else if (realCause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
String errorMsg = String.format("[%s] Publish message fail,"
+ " because the topic is not served by this broker", topic);
log.error(errorMsg);
MqttProperties.UserProperties userProperties = new MqttProperties.UserProperties();
userProperties.add("topicName", topic);
MqttPubAck.MqttPubErrorAckBuilder pubAckBuilder = MqttPubAck.errorBuilder(protocolVersion)
.packetId(packetId)
.reasonString(errorMsg)
.userProperties(userProperties)
.reasonCode(Mqtt5PubReasonCode.UNSPECIFIED_ERROR);
connection.sendAck(pubAckBuilder.build());
} else if (realCause instanceof MQTTTopicAliasNotFoundException) {
log.error("[{}] Publish message fail {}, because the topic alias {} not found.", topic, msg,
((MQTTTopicAliasNotFoundException) realCause).getAlias(), ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ protected MQTTCommonConfiguration initConfig() throws Exception{
mqtt.setManagedLedgerCacheSizeMB(8);
mqtt.setActiveConsumerFailoverDelayTimeMillis(0);
mqtt.setDefaultRetentionTimeInMinutes(7);
mqtt.setDefaultNumberOfNamespaceBundles(1);
mqtt.setDefaultNumberOfNamespaceBundles(4);
mqtt.setZookeeperServers("localhost:2181");
mqtt.setConfigurationStoreServers("localhost:3181");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import io.streamnative.pulsar.handlers.mqtt.messages.codes.mqtt5.Mqtt5PubReasonCode;
import io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base.MQTT5ClientUtils;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
Expand All @@ -44,6 +47,44 @@ public MQTTCommonConfiguration initConfig() throws Exception {
return conf;
}

@Test(timeOut = 30_000)
public void testBrokerThrowServiceNotReadyException() throws Exception {
Mqtt5BlockingClient client = MQTT5ClientUtils.createMqtt5ProxyClient(
getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())));
client.connect();
final String topic1 = "topic-1";
final String ownedBroker = admin.lookups().lookupTopic(topic1);

final Mqtt5PublishResult r1 = client.publishWith()
.topic(topic1)
.qos(MqttQos.AT_LEAST_ONCE)
.payload("msg1".getBytes(StandardCharsets.UTF_8))
.send();
Assert.assertFalse(r1.getError().isPresent());
while (ownedBroker.equals(admin.lookups().lookupTopic(topic1))) {
admin.namespaces().unload("public/default");
Thread.sleep(1000);
}
try {
final Mqtt5PublishResult r2 = client.publishWith()
.topic(topic1)
.qos(MqttQos.AT_LEAST_ONCE)
.payload("msg1".getBytes(StandardCharsets.UTF_8))
.send();
} catch (Exception ex) {
Assert.assertTrue(ex instanceof Mqtt5PubAckException);
Assert.assertEquals(((Mqtt5PubAckException) ex).getMqttMessage().getReasonCode().getCode()
, Mqtt5PubReasonCode.UNSPECIFIED_ERROR.value());
}
final Mqtt5PublishResult r3 = client.publishWith()
.topic(topic1)
.qos(MqttQos.AT_LEAST_ONCE)
.payload("msg1".getBytes(StandardCharsets.UTF_8))
.send();
Assert.assertFalse(r3.getError().isPresent());
client.disconnect();
}

@Test(invocationCount = 2)
public void testDynamicUpdateSubscribe() throws InterruptedException, PulsarAdminException {
final String topicFilter = "/a/#";
Expand Down

0 comments on commit 49f9fbf

Please sign in to comment.