diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java index ce7ab48d..41a2ed96 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AbstractAmqpExchange.java @@ -26,7 +26,6 @@ public abstract class AbstractAmqpExchange implements AmqpExchange { protected Set queues; protected boolean durable; protected boolean autoDelete; - public static final String DEFAULT_EXCHANGE = "aop.direct.memory"; public static final String DEFAULT_EXCHANGE_DURABLE = "aop.direct.durable"; protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeType, diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java index f1ef31dd..f14c0db1 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpChannel.java @@ -160,32 +160,17 @@ public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type, type, passive, durable, autoDelete, internal, nowait, arguments); } - String exchangeName = exchange.toString(). - replaceAll("\r", ""). - replaceAll("\n", "").trim(); - final MethodRegistry methodRegistry = connection.getMethodRegistry(); - final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody(); - if (isDefaultExchange(exchange)) { - if (!AMQShortString.createAMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type)) { - StringBuffer sb = new StringBuffer(); - sb.append("Attempt to redeclare default exchange: of type") - .append(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).append("to").append(type).append("."); - connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, sb.toString(), channelId); - } else { - // in-memory integration - if (!durable) { - InMemoryExchange inMemoryExchange = new InMemoryExchange( - exchangeName, AmqpExchange.Type.value(type.toString()), autoDelete); - ExchangeContainer.putExchange(connection.getNamespaceName(), exchange.toString(), inMemoryExchange); - } - if (!nowait) { - sync(); - // if declare a default exchange, return success. - connection.writeFrame(declareOkBody.generateFrame(channelId)); - } - } + StringBuffer sb = new StringBuffer(); + sb.append("Attempt to redeclare default exchange: of type") + .append(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).append("to").append(type).append("."); + connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, sb.toString(), channelId); } else { + String exchangeName = exchange.toString(). + replaceAll("\r", ""). + replaceAll("\n", "").trim(); + final MethodRegistry methodRegistry = connection.getMethodRegistry(); + final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody(); AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName); if (passive) { if (null == amqpExchange) { @@ -301,12 +286,16 @@ public void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boo log.debug("RECV[{}] ExchangeDelete[ exchange: {}, ifUnused: {}, nowait:{} ]", channelId, exchange, ifUnused, nowait); } - String exchangeName = exchange.toString(). - replaceAll("\r", ""). - replaceAll("\n", "").trim(); if (isDefaultExchange(exchange)) { - connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Default Exchange cannot be deleted. ", channelId); + connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Default Exchange cannot be deleted. ", + channelId); + } else if (isBuildInExchange(exchange)) { + connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "BuildIn Exchange cannot be deleted. ", + channelId); } else { + String exchangeName = exchange.toString(). + replaceAll("\r", ""). + replaceAll("\n", "").trim(); AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName); if (null == amqpExchange) { closeChannel(ErrorCodes.NOT_FOUND, "No such exchange: '" + exchange + "'"); @@ -421,8 +410,7 @@ public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean d } QueueContainer.putQueue(connection.getNamespaceName(), queue.toString(), amqpQueue); setDefaultQueue(amqpQueue); - // return success. - // when call QueueBind, then create Pulsar sub. + MethodRegistry methodRegistry = connection.getMethodRegistry(); QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queue, 0, 0); connection.writeFrame(responseBody.generateFrame(channelId)); @@ -438,7 +426,7 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS channelId, queue, exchange, bindingKey, nowait, argumentsTable); } Map arguments = FieldTable.convertToMap(argumentsTable); - + String exchangeName; AmqpQueue amqpQueue; if (queue == null) { amqpQueue = getDefaultQueue(); @@ -456,11 +444,20 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS } checkExclusiveQueue(amqpQueue); - AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchange.toString()); + if (isDefaultExchange(exchange)) { + exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE; + } else { + exchangeName = exchange.toString(); + } + AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName); if (amqpExchange == null) { closeChannel(ErrorCodes.NOT_FOUND, "No such exchange: '" + exchange + "'"); return; } + if (exchangeName.equals(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE)){ + closeChannel(ErrorCodes.ACCESS_REFUSED, "Can not bind to default exchange "); + return; + } AmqpMessageRouter messageRouter = AbstractAmqpMessageRouter.generateRouter(amqpExchange.getType()); if (messageRouter == null) { @@ -507,12 +504,28 @@ public void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean i log.debug("RECV[{}] QueueDelete[ queue: {}, ifUnused:{}, ifEmpty:{}, nowait:{} ]", channelId, queue, ifUnused, ifEmpty, nowait); } - // TODO - // return success. - MethodRegistry methodRegistry = connection.getMethodRegistry(); - QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(1); - connection.writeFrame(responseBody.generateFrame(channelId)); - + AmqpQueue amqpQueue; + if (queue == null) { + //get the default queue on the channel: + amqpQueue = getDefaultQueue(); + } else { + amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), queue.toString()); + } + if (amqpQueue == null) { + closeChannel(ErrorCodes.NOT_FOUND, "Queue '" + queue.toString() + "' does not exist."); + } else { + checkExclusiveQueue(amqpQueue); + if (!nowait) { + sync(); + QueueContainer.deleteQueue(connection.getNamespaceName(), queue.toString()); + AmqpExchange defaultExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), + AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE); + amqpQueue.unbindExchange(defaultExchange); + MethodRegistry methodRegistry = connection.getMethodRegistry(); + QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(0); + connection.writeFrame(responseBody.generateFrame(channelId)); + } + } } @Override @@ -523,19 +536,32 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM exchange, bindingKey, arguments); } - // in-memory integration AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), queue.toString()); checkExclusiveQueue(amqpQueue); + String exchangeName; + if (isDefaultExchange(exchange)) { + exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE; + } else { + exchangeName = exchange.toString(); + } AmqpExchange amqpExchange = ExchangeContainer. - getExchange(connection.getNamespaceName(), exchange.toString()); + getExchange(connection.getNamespaceName(), exchangeName); + if (exchangeName.equals(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE)) { + closeChannel(ErrorCodes.ACCESS_REFUSED, "Can not unbind to default exchange "); + return; + } if (amqpQueue instanceof InMemoryQueue && amqpExchange instanceof InMemoryExchange) { amqpQueue.unbindExchange(amqpExchange); + if (amqpExchange.getAutoDelete() && (amqpExchange.getQueueSize() == 0)) { + ExchangeContainer.deleteExchange(connection.getNamespaceName(), exchangeName); + } AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); connection.writeFrame(responseBody.generateFrame(channelId)); + return; } TopicName topicName = TopicName.get(TopicDomain.persistent.value(), - connection.getNamespaceName(), exchange.toString()); + connection.getNamespaceName(), exchangeName); Topic topic = AmqpTopicManager.getOrCreateTopic(connection.getPulsarService(), topicName.toString(), false); if (null == topic) { @@ -543,6 +569,10 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM } else { try { amqpQueue.unbindExchange(amqpExchange); + if (amqpExchange.getAutoDelete() && (amqpExchange.getQueueSize() == 0)) { + ExchangeContainer.deleteExchange(connection.getNamespaceName(), exchange.toString()); + topic.delete().get(); + } final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody(); connection.writeFrame(responseBody.generateFrame(channelId)); } catch (Exception e) { @@ -699,16 +729,18 @@ public void receiveBasicCancel(AMQShortString consumerTag, boolean noWait) { } @Override - public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString routingKey, boolean mandatory, + public void receiveBasicPublish(AMQShortString exchange, AMQShortString routingKey, boolean mandatory, boolean immediate) { if (log.isDebugEnabled()) { log.debug("RECV[{}] BasicPublish[exchange: {} routingKey: {} mandatory: {} immediate: {}]", - channelId, exchangeName, routingKey, mandatory, immediate); + channelId, exchange, routingKey, mandatory, immediate); } + String exchangeName; - if (exchangeName == null || exchangeName.length() == 0) { + if (exchange == null || exchange.length() == 0) { AmqpExchange amqpExchange = ExchangeContainer. getExchange(connection.getNamespaceName(), AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE); + exchangeName = amqpExchange.getName(); AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), routingKey.toString()); if (amqpQueue == null) { @@ -717,13 +749,17 @@ public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString rout return; } -// if (amqpQueue.getRouter("") == null) { -// AmqpMessageRouter amqpMessageRouter = new DirectMessageRouter( -// AmqpMessageRouter.Type.Direct, routingKey.toString()); -// amqpQueue.bindExchange(amqpExchange, amqpMessageRouter); -// } + // bind to default exchange. + if (amqpQueue.getRouter(exchangeName) == null) { + amqpQueue.bindExchange(amqpExchange, + AbstractAmqpMessageRouter.generateRouter(AmqpExchange.Type.Direct), + routingKey.toString(), null); + } + } else { + exchangeName = exchange.toString(); } - MessagePublishInfo info = new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey); + MessagePublishInfo info = new MessagePublishInfo(AMQShortString.valueOf(exchangeName), immediate, + mandatory, routingKey); setPublishFrame(info, null); } @@ -888,14 +924,9 @@ private void deliverCurrentMessageIfComplete() { return; } - AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), routingKey); AmqpExchange amqpExchange; if (exchangeName == null || exchangeName.length() == 0) { - if (amqpQueue.getDurable()) { - exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE; - } else { - exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE; - } + exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE; } amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName); if (amqpExchange == null) { @@ -1119,6 +1150,16 @@ private boolean isDefaultExchange(final AMQShortString exchangeName) { return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName); } + private boolean isBuildInExchange(final AMQShortString exchangeName) { + if (exchangeName.toString().equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME) + || (exchangeName.toString().equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME)) + || (exchangeName.toString().equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))) { + return true; + } else { + return false; + } + } + private void closeChannel(int cause, final String message) { connection.closeChannelAndWriteFrame(this, cause, message); } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java index 519db0d7..b3c020b7 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java @@ -38,6 +38,8 @@ import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicDomain; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.ErrorCodes; @@ -248,15 +250,23 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap assertState(ConnectionState.AWAIT_OPEN); + boolean isDefaultNamespace = false; String virtualHostStr = AMQShortString.toString(virtualHost); if ((virtualHostStr != null) && virtualHostStr.charAt(0) == '/') { virtualHostStr = virtualHostStr.substring(1); if (StringUtils.isEmpty(virtualHostStr)){ virtualHostStr = DEFAULT_NAMESPACE; + isDefaultNamespace = true; } } NamespaceName namespaceName = NamespaceName.get(amqpConfig.getAmqpTenant(), virtualHostStr); + if (isDefaultNamespace) { + // avoid the namespace public/default is not owned in standalone mode + TopicName topic = TopicName.get(TopicDomain.persistent.value(), + namespaceName, "__lookup__"); + getPulsarService().getNamespaceService().getBrokerServiceUrlAsync(topic, true); + } // Policies policies = getPolicies(namespaceName); // if (policies != null) { this.namespaceName = namespaceName; diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java index 64b09500..b1ab446f 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java @@ -87,6 +87,7 @@ public void start(BrokerService service) { brokerService = service; ConnectionContainer.init(brokerService.getPulsar()); + ExchangeContainer.init(brokerService.getPulsar()); if (amqpConfig.isAmqpProxyEnable()) { ProxyConfiguration proxyConfig = new ProxyConfiguration(); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ConnectionContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ConnectionContainer.java index 9ed55754..3dff6691 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ConnectionContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ConnectionContainer.java @@ -15,18 +15,14 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; -import io.streamnative.pulsar.handlers.amqp.impl.InMemoryExchange; -import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; import java.util.Collections; import java.util.Map; import java.util.Set; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener; -import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.NamespaceBundle; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.zookeeper.ZooKeeper; /** @@ -46,7 +42,7 @@ public static void init(PulsarService pulsarService) { @Override public void onLoad(NamespaceBundle namespaceBundle) { log.info("ConnectionContainer [onLoad] namespaceBundle: {}", namespaceBundle); - defaultExchangeInit(namespaceBundle.getNamespaceObject()); + ExchangeContainer.initBuildInExchange(namespaceBundle.getNamespaceObject()); } @Override @@ -99,33 +95,4 @@ public static void removeConnection(NamespaceName namespaceName, AmqpConnection connectionMap.getOrDefault(namespaceName, Collections.emptySet()).remove(amqpConnection); } - private static void defaultExchangeInit(NamespaceName namespaceName) { - AmqpExchange inMemoryExchange = ExchangeContainer.getExchange(namespaceName, - AbstractAmqpExchange.DEFAULT_EXCHANGE); - - if (inMemoryExchange == null) { - ExchangeContainer.putExchange(namespaceName, AbstractAmqpExchange.DEFAULT_EXCHANGE, - new InMemoryExchange("", AmqpExchange.Type.Direct, false)); - } - - addBuildInExchanges(namespaceName, AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct); - addBuildInExchanges(namespaceName, ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct); - addBuildInExchanges(namespaceName, ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout); - addBuildInExchanges(namespaceName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic); - } - - private static void addBuildInExchanges(NamespaceName namespaceName, - String exchangeName, AmqpExchange.Type exchangeType) { - String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName); - AmqpTopicManager.getTopic(pulsarService, topicName, true).whenComplete((topic, throwable) -> { - if (throwable != null) { - log.error("Create default exchange topic failed. errorMsg: {}", throwable.getMessage(), throwable); - return; - } - ExchangeContainer.putExchange(namespaceName, exchangeName, - new PersistentExchange(exchangeName, exchangeType, - (PersistentTopic) topic, false)); - }); - } - } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java index a9696ded..6a0d4b2c 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java @@ -15,12 +15,18 @@ package io.streamnative.pulsar.handlers.amqp; import com.google.common.collect.Maps; +import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.qpid.server.exchange.ExchangeDefaults; + /** * Container for all exchanges in the broker. @@ -28,6 +34,19 @@ @Slf4j public class ExchangeContainer { + private static final Map BUILDIN_EXCHANGE_NAME_SET = new HashMap<>(); + private static PulsarService pulsarService; + + public static void init(PulsarService pulsarService) { + if (ExchangeContainer.pulsarService == null) { + ExchangeContainer.pulsarService = pulsarService; + BUILDIN_EXCHANGE_NAME_SET.put(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct); + BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct); + BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout); + BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic); + } + } + @Getter private static Map> exchangeMap = new ConcurrentHashMap<>(); @@ -62,4 +81,26 @@ public static void deleteExchange(NamespaceName namespaceName, String exchangeNa } } + public static void initBuildInExchange(NamespaceName namespaceName) { + for (Map.Entry entry : BUILDIN_EXCHANGE_NAME_SET.entrySet()) { + if (getExchange(namespaceName, entry.getKey()) == null) { + addBuildInExchanges(namespaceName, entry.getKey(), entry.getValue()); + } + } + } + + private static void addBuildInExchanges(NamespaceName namespaceName, + String exchangeName, AmqpExchange.Type exchangeType) { + String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName); + AmqpTopicManager.getTopic(pulsarService, topicName, true).whenComplete((topic, throwable) -> { + if (throwable != null) { + log.error("Create default exchange topic failed. errorMsg: {}", throwable.getMessage(), throwable); + return; + } + ExchangeContainer.putExchange(namespaceName, exchangeName, + new PersistentExchange(exchangeName, exchangeType, + (PersistentTopic) topic, false)); + }); + } + } diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpChannelMethodTest.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpChannelMethodTest.java index db364765..86a8ef13 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpChannelMethodTest.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpChannelMethodTest.java @@ -105,10 +105,6 @@ public void testAccessRequest() { @Test public void testExchangeDeclareFail() { - String tenant = "public"; - String namespace = "ns"; - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Init); ExchangeDeclareBody cmd = methodRegistry .createExchangeDeclareBody(0, "test", "fanout", false, false, false, false, true, null); @@ -120,11 +116,7 @@ public void testExchangeDeclareFail() { @Test public void testExchangeDeclareSuccess() { - String tenant = "public"; - String namespace = "ns"; String exchange = "test"; - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Started); ExchangeDeclareBody cmd = methodRegistry @@ -137,11 +129,7 @@ public void testExchangeDeclareSuccess() { @Test public void testExchangeDeclarePassive(){ - String tenant = "public"; - String namespace = "ns"; String exchange = "test"; - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Started); ExchangeDeclareBody cmd1 = methodRegistry .createExchangeDeclareBody(0, exchange, "fanout", @@ -164,8 +152,6 @@ public void testExchangeDeclarePassive(){ @Test public void testQueueDeclarePassive(){ - NamespaceName namespaceName = NamespaceName.get("public", "vhost1"); - connection.setNamespaceName(namespaceName); QueueDeclareBody cmd1 = methodRegistry.createQueueDeclareBody(0, "queue", false, true, false, false, false, null); cmd1.generateFrame(1).writePayload(toServerSender); @@ -187,11 +173,7 @@ public void testQueueDeclarePassive(){ @SneakyThrows @Test public void testExchangeDelete() { - String tenant = "public"; - String namespace = "ns"; String exchange = "test1"; - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Started); ExchangeDeclareBody cmd = methodRegistry @@ -212,13 +194,9 @@ public void testExchangeDelete() { @Test public void testExchangeBound() { - String tenant = "public"; - String namespace = "ns"; String exchange = "test"; List subs = new ArrayList<>(); subs.add(exchange); - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); ExchangeBoundBody cmd = methodRegistry.createExchangeBoundBody(exchange, "key", "queue"); cmd.generateFrame(1).writePayload(toServerSender); @@ -242,14 +220,10 @@ public void testQueueDeclare() { @SneakyThrows @Test(dataProvider = "exchangeAndQueueIsDurableBuilder") public void testQueueBind(boolean exchangeIsDurable, boolean queueIsDurable) { - String tenant = "public"; - String namespace = "ns"; String exchange = "test" + count; String queue = "queue" + count; List subs = new ArrayList<>(); subs.add(exchange); - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Started); exchangeDeclare(exchange, exchangeIsDurable); @@ -284,25 +258,31 @@ public void testQueuePurge() { @Test public void testQueueDelete() { - QueueDeleteBody cmd = methodRegistry.createQueueDeleteBody(0, "queue", false, false, - false); + NamespaceName namespaceName = NamespaceName.get("public", "vhost1"); + connection.setNamespaceName(namespaceName); + QueueDeclareBody cmd = methodRegistry.createQueueDeclareBody(0, "queue", false, true, + false, false, false, null); cmd.generateFrame(1).writePayload(toServerSender); toServerSender.flush(); + + QueueDeleteBody cmd1 = methodRegistry.createQueueDeleteBody(0, "queue", false, false, + false); + cmd1.generateFrame(1).writePayload(toServerSender); + toServerSender.flush(); + AMQBody response = (AMQBody) clientChannel.poll(); - Assert.assertTrue(response instanceof QueueDeleteOkBody); + Assert.assertTrue(response instanceof QueueDeclareOkBody); + AMQBody response1 = (AMQBody) clientChannel.poll(); + Assert.assertTrue(response1 instanceof QueueDeleteOkBody); } @SneakyThrows @Test public void testQueueUnbind() { - String tenant = "public"; - String namespace = "ns"; String exchange = "test"; String queue = "queue"; List subs = new ArrayList<>(); subs.add(exchange); - NamespaceName namespaceName = NamespaceName.get(tenant, namespace); - connection.setNamespaceName(namespaceName); exchangeDeclare(exchange, true); queueDeclare(queue, true); diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java index 4fa41b66..ea240aca 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/AmqpProtocolTestBase.java @@ -17,11 +17,16 @@ import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPipeline; import io.netty.util.concurrent.DefaultEventExecutor; +import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; import io.streamnative.pulsar.handlers.amqp.AmqpChannel; import io.streamnative.pulsar.handlers.amqp.AmqpClientDecoder; import io.streamnative.pulsar.handlers.amqp.AmqpConnection; +import io.streamnative.pulsar.handlers.amqp.AmqpExchange; import io.streamnative.pulsar.handlers.amqp.AmqpPulsarServerCnx; import io.streamnative.pulsar.handlers.amqp.AmqpServiceConfiguration; +import io.streamnative.pulsar.handlers.amqp.AmqpTopicManager; +import io.streamnative.pulsar.handlers.amqp.ExchangeContainer; +import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; import io.streamnative.pulsar.handlers.amqp.test.frame.AmqpClientChannel; import io.streamnative.pulsar.handlers.amqp.test.frame.AmqpClientMethodProcessor; import io.streamnative.pulsar.handlers.amqp.test.frame.ToClientByteBufferSender; @@ -48,8 +53,10 @@ import org.apache.pulsar.client.admin.Namespaces; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.common.lookup.data.LookupData; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; +import org.apache.qpid.server.exchange.ExchangeDefaults; import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.transport.AMQBody; @@ -108,6 +115,28 @@ public void setup() throws Exception { Mockito.when(connection.getPulsarService().getState()).thenReturn(PulsarService.State.Started); initMockAmqpTopicManager(); initProtocol(); + initDefaultExchange(); + } + + private void initDefaultExchange() { + String tenant = "public"; + String namespace = "vhost1"; + NamespaceName namespaceName = NamespaceName.get(tenant, namespace); + connection.setNamespaceName(namespaceName); + addBuildInExchanges(namespaceName, AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct); + addBuildInExchanges(namespaceName, ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct); + addBuildInExchanges(namespaceName, ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout); + addBuildInExchanges(namespaceName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic); + } + + private void addBuildInExchanges(NamespaceName namespaceName, + String exchangeName, AmqpExchange.Type exchangeType) { + String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName); + Topic topic = AmqpTopicManager. + getOrCreateTopic(connection.getPulsarService(), topicName, true); + ExchangeContainer.putExchange(namespaceName, exchangeName, + new PersistentExchange(exchangeName, exchangeType, + (PersistentTopic) topic, false)); } private void initMockAmqpTopicManager(){ diff --git a/tests/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java b/tests/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java index a8a9a363..13bec2de 100644 --- a/tests/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java +++ b/tests/src/test/java/com/rabbitmq/client/test/BrokerTestCase.java @@ -37,6 +37,8 @@ import java.util.concurrent.TimeoutException; import javax.net.ssl.SSLContext; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.PulsarClientImpl; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.ClusterData; import org.apache.pulsar.common.policies.data.RetentionPolicies; import org.apache.pulsar.common.policies.data.TenantInfo; @@ -73,7 +75,7 @@ protected void finished(Description description) { } }; - protected ConnectionFactory connectionFactory = newConnectionFactory(); + protected ConnectionFactory connectionFactory; protected ConnectionFactory newConnectionFactory() { ConnectionFactory connectionFactory = TestUtils.connectionFactory(); @@ -119,6 +121,8 @@ public void setup() throws Exception { admin.namespaces().setRetention("public/vhost1", new RetentionPolicies(60, 1000)); } + ((PulsarClientImpl) getPulsarServiceList().get(0).getClient()).getLookup(). + getBroker(TopicName.get("public/vhost1/test")).join(); checkPulsarServiceState(); assumeTrue(shouldRun()); openConnection(); @@ -191,6 +195,7 @@ protected void bareRestart() public void openConnection() throws IOException, TimeoutException { if (connection == null) { + connectionFactory = newConnectionFactory(); connectionFactory.setPort(getAmqpBrokerPortList().get(0)); connection = connectionFactory.newConnection(); } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java index 7c48b7f1..7e5054e3 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQMessagingTest.java @@ -346,4 +346,9 @@ public void handleDelivery(String consumerTag, } + @Test(timeOut = 1000 * 5) + public void defaultEmptyExchangeTest() throws Exception { + defaultEmptyExchangeTest("vhost1", false); + } + } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java index 036cbb1e..720fdc3f 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/RabbitMQTestBase.java @@ -13,6 +13,8 @@ */ package io.streamnative.pulsar.handlers.amqp.rabbitmq; +import static java.nio.charset.StandardCharsets.UTF_8; + import com.google.common.collect.Sets; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.BuiltinExchangeType; @@ -204,4 +206,44 @@ public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProp System.out.println("[" + testName + "] Test finish. Receive total msg cnt: " + totalReceiveMsgCnt); Assert.assertEquals(expectedMsgCntPerQueue * queueList.size(), totalReceiveMsgCnt.get()); } + + public void defaultEmptyExchangeTest(String vhost, Boolean amqpProxyEnable) throws Exception { + @Cleanup + Connection connection = getConnection("vhost1", amqpProxyEnable); + @Cleanup + Channel channel = connection.createChannel(); + + final int queueCount = 5; + final int messageCount = 100; + final String contentMsg = "Hello AoP "; + + List queueNameList = new ArrayList<>(); + for (int i = 0; i < queueCount; i++) { + String queueName = randQuName(); + queueNameList.add(queueName); + channel.queueDeclare(queueName, true, false, false, null); + } + + for (String queueName : queueNameList) { + for (int i = 0; i < messageCount; i++) { + channel.basicPublish("", queueName, null, (contentMsg + i).getBytes(UTF_8)); + } + } + + AtomicInteger receiveMsgCount = new AtomicInteger(0); + CountDownLatch countDownLatch = new CountDownLatch(messageCount * queueCount); + for (String queueName : queueNameList) { + Consumer consumer = new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, + AMQP.BasicProperties properties, byte[] body) throws IOException { + receiveMsgCount.incrementAndGet(); + countDownLatch.countDown(); + } + }; + channel.basicConsume(queueName, consumer); + } + countDownLatch.await(); + Assert.assertEquals(messageCount * queueCount, receiveMsgCount.get()); + } } diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/DefaultExchange.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/DefaultExchange.java index 6287fb09..99603676 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/DefaultExchange.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/DefaultExchange.java @@ -20,28 +20,31 @@ import com.rabbitmq.client.AMQP; import com.rabbitmq.client.test.BrokerTestCase; import java.io.IOException; +import org.junit.Test; + /** * Testcase. */ public class DefaultExchange extends BrokerTestCase { - String queueName; + String queueName = "queue1"; @Override protected void createResources() throws IOException { - queueName = channel.queueDeclare().getQueue(); + channel.queueDeclare(queueName, true, false, false, null); } // See bug 22101: publish and declare are the only operations // permitted on the default exchange - //@Test - public void defaultExchangePublish() throws IOException { + @Test + public void defaultExchangePublish() throws Exception { basicPublishVolatile("", queueName); // Implicit binding + Thread.sleep(2000); assertDelivered(queueName, 1); } - //@Test - public void bindToDefaultExchange() throws IOException { + @Test + public void bindToDefaultExchange() { try { channel.queueBind(queueName, "", "foobar"); fail(); @@ -50,8 +53,8 @@ public void bindToDefaultExchange() throws IOException { } } - //@Test - public void unbindFromDefaultExchange() throws IOException { + @Test + public void unbindFromDefaultExchange() { try { channel.queueUnbind(queueName, "", queueName); fail(); @@ -60,8 +63,8 @@ public void unbindFromDefaultExchange() throws IOException { } } - //@Test - public void declareDefaultExchange() throws IOException { + @Test + public void declareDefaultExchange() { try { channel.exchangeDeclare("", "direct", true); fail(); @@ -70,8 +73,8 @@ public void declareDefaultExchange() throws IOException { } } - //@Test - public void deleteDefaultExchange() throws IOException { + @Test + public void deleteDefaultExchange() { try { channel.exchangeDelete(""); fail(); diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeletePredeclared.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeletePredeclared.java index 143dd8ef..4bca70d7 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeletePredeclared.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/rabbitmq/functional/ExchangeDeletePredeclared.java @@ -16,16 +16,21 @@ package io.streamnative.pulsar.handlers.amqp.rabbitmq.functional; +import static org.junit.Assert.fail; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.test.BrokerTestCase; import java.io.IOException; +import org.junit.Test; + /** * Testcase. */ public class ExchangeDeletePredeclared extends BrokerTestCase { + @Test public void testDeletingPredeclaredAmqExchange() throws IOException { try { channel.exchangeDelete("amq.fanout"); + fail(); } catch (IOException e) { checkShutdownSignal(AMQP.ACCESS_REFUSED, e); }