Skip to content

Commit

Permalink
Add support of default exchange (#104)
Browse files Browse the repository at this point in the history
Motivation
Add support of default exchange.

Modifications
Impl queue delete;
When queue created, add binding to default exchang; and delete the binding when the queue deleted.
  • Loading branch information
zhanghaou authored Jun 13, 2020
1 parent dc711b4 commit e0ca154
Show file tree
Hide file tree
Showing 13 changed files with 265 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ public abstract class AbstractAmqpExchange implements AmqpExchange {
protected Set<AmqpQueue> queues;
protected boolean durable;
protected boolean autoDelete;
public static final String DEFAULT_EXCHANGE = "aop.direct.memory";
public static final String DEFAULT_EXCHANGE_DURABLE = "aop.direct.durable";

protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,32 +160,17 @@ public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type,
type, passive, durable, autoDelete, internal, nowait, arguments);
}

String exchangeName = exchange.toString().
replaceAll("\r", "").
replaceAll("\n", "").trim();
final MethodRegistry methodRegistry = connection.getMethodRegistry();
final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();

if (isDefaultExchange(exchange)) {
if (!AMQShortString.createAMQShortString(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).equals(type)) {
StringBuffer sb = new StringBuffer();
sb.append("Attempt to redeclare default exchange: of type")
.append(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).append("to").append(type).append(".");
connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, sb.toString(), channelId);
} else {
// in-memory integration
if (!durable) {
InMemoryExchange inMemoryExchange = new InMemoryExchange(
exchangeName, AmqpExchange.Type.value(type.toString()), autoDelete);
ExchangeContainer.putExchange(connection.getNamespaceName(), exchange.toString(), inMemoryExchange);
}
if (!nowait) {
sync();
// if declare a default exchange, return success.
connection.writeFrame(declareOkBody.generateFrame(channelId));
}
}
StringBuffer sb = new StringBuffer();
sb.append("Attempt to redeclare default exchange: of type")
.append(ExchangeDefaults.DIRECT_EXCHANGE_CLASS).append("to").append(type).append(".");
connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, sb.toString(), channelId);
} else {
String exchangeName = exchange.toString().
replaceAll("\r", "").
replaceAll("\n", "").trim();
final MethodRegistry methodRegistry = connection.getMethodRegistry();
final AMQMethodBody declareOkBody = methodRegistry.createExchangeDeclareOkBody();
AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName);
if (passive) {
if (null == amqpExchange) {
Expand Down Expand Up @@ -301,12 +286,16 @@ public void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boo
log.debug("RECV[{}] ExchangeDelete[ exchange: {}, ifUnused: {}, nowait:{} ]", channelId, exchange, ifUnused,
nowait);
}
String exchangeName = exchange.toString().
replaceAll("\r", "").
replaceAll("\n", "").trim();
if (isDefaultExchange(exchange)) {
connection.sendConnectionClose(ErrorCodes.NOT_ALLOWED, "Default Exchange cannot be deleted. ", channelId);
connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "Default Exchange cannot be deleted. ",
channelId);
} else if (isBuildInExchange(exchange)) {
connection.sendConnectionClose(ErrorCodes.ACCESS_REFUSED, "BuildIn Exchange cannot be deleted. ",
channelId);
} else {
String exchangeName = exchange.toString().
replaceAll("\r", "").
replaceAll("\n", "").trim();
AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName);
if (null == amqpExchange) {
closeChannel(ErrorCodes.NOT_FOUND, "No such exchange: '" + exchange + "'");
Expand Down Expand Up @@ -421,8 +410,7 @@ public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean d
}
QueueContainer.putQueue(connection.getNamespaceName(), queue.toString(), amqpQueue);
setDefaultQueue(amqpQueue);
// return success.
// when call QueueBind, then create Pulsar sub.

MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeclareOkBody responseBody = methodRegistry.createQueueDeclareOkBody(queue, 0, 0);
connection.writeFrame(responseBody.generateFrame(channelId));
Expand All @@ -438,7 +426,7 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS
channelId, queue, exchange, bindingKey, nowait, argumentsTable);
}
Map<String, Object> arguments = FieldTable.convertToMap(argumentsTable);

String exchangeName;
AmqpQueue amqpQueue;
if (queue == null) {
amqpQueue = getDefaultQueue();
Expand All @@ -456,11 +444,20 @@ public void receiveQueueBind(AMQShortString queue, AMQShortString exchange, AMQS
}
checkExclusiveQueue(amqpQueue);

AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchange.toString());
if (isDefaultExchange(exchange)) {
exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE;
} else {
exchangeName = exchange.toString();
}
AmqpExchange amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName);
if (amqpExchange == null) {
closeChannel(ErrorCodes.NOT_FOUND, "No such exchange: '" + exchange + "'");
return;
}
if (exchangeName.equals(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE)){
closeChannel(ErrorCodes.ACCESS_REFUSED, "Can not bind to default exchange ");
return;
}

AmqpMessageRouter messageRouter = AbstractAmqpMessageRouter.generateRouter(amqpExchange.getType());
if (messageRouter == null) {
Expand Down Expand Up @@ -507,12 +504,28 @@ public void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean i
log.debug("RECV[{}] QueueDelete[ queue: {}, ifUnused:{}, ifEmpty:{}, nowait:{} ]", channelId, queue,
ifUnused, ifEmpty, nowait);
}
// TODO
// return success.
MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(1);
connection.writeFrame(responseBody.generateFrame(channelId));

AmqpQueue amqpQueue;
if (queue == null) {
//get the default queue on the channel:
amqpQueue = getDefaultQueue();
} else {
amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), queue.toString());
}
if (amqpQueue == null) {
closeChannel(ErrorCodes.NOT_FOUND, "Queue '" + queue.toString() + "' does not exist.");
} else {
checkExclusiveQueue(amqpQueue);
if (!nowait) {
sync();
QueueContainer.deleteQueue(connection.getNamespaceName(), queue.toString());
AmqpExchange defaultExchange = ExchangeContainer.getExchange(connection.getNamespaceName(),
AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE);
amqpQueue.unbindExchange(defaultExchange);
MethodRegistry methodRegistry = connection.getMethodRegistry();
QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(0);
connection.writeFrame(responseBody.generateFrame(channelId));
}
}
}

@Override
Expand All @@ -523,26 +536,43 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM
exchange, bindingKey, arguments);
}

// in-memory integration
AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), queue.toString());
checkExclusiveQueue(amqpQueue);
String exchangeName;
if (isDefaultExchange(exchange)) {
exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE;
} else {
exchangeName = exchange.toString();
}
AmqpExchange amqpExchange = ExchangeContainer.
getExchange(connection.getNamespaceName(), exchange.toString());
getExchange(connection.getNamespaceName(), exchangeName);
if (exchangeName.equals(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE)) {
closeChannel(ErrorCodes.ACCESS_REFUSED, "Can not unbind to default exchange ");
return;
}
if (amqpQueue instanceof InMemoryQueue && amqpExchange instanceof InMemoryExchange) {
amqpQueue.unbindExchange(amqpExchange);
if (amqpExchange.getAutoDelete() && (amqpExchange.getQueueSize() == 0)) {
ExchangeContainer.deleteExchange(connection.getNamespaceName(), exchangeName);
}
AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
return;
}

TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
connection.getNamespaceName(), exchange.toString());
connection.getNamespaceName(), exchangeName);

Topic topic = AmqpTopicManager.getOrCreateTopic(connection.getPulsarService(), topicName.toString(), false);
if (null == topic) {
connection.sendConnectionClose(INTERNAL_ERROR, "exchange not found.", channelId);
} else {
try {
amqpQueue.unbindExchange(amqpExchange);
if (amqpExchange.getAutoDelete() && (amqpExchange.getQueueSize() == 0)) {
ExchangeContainer.deleteExchange(connection.getNamespaceName(), exchange.toString());
topic.delete().get();
}
final AMQMethodBody responseBody = connection.getMethodRegistry().createQueueUnbindOkBody();
connection.writeFrame(responseBody.generateFrame(channelId));
} catch (Exception e) {
Expand Down Expand Up @@ -699,16 +729,18 @@ public void receiveBasicCancel(AMQShortString consumerTag, boolean noWait) {
}

@Override
public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString routingKey, boolean mandatory,
public void receiveBasicPublish(AMQShortString exchange, AMQShortString routingKey, boolean mandatory,
boolean immediate) {
if (log.isDebugEnabled()) {
log.debug("RECV[{}] BasicPublish[exchange: {} routingKey: {} mandatory: {} immediate: {}]",
channelId, exchangeName, routingKey, mandatory, immediate);
channelId, exchange, routingKey, mandatory, immediate);
}
String exchangeName;

if (exchangeName == null || exchangeName.length() == 0) {
if (exchange == null || exchange.length() == 0) {
AmqpExchange amqpExchange = ExchangeContainer.
getExchange(connection.getNamespaceName(), AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE);
exchangeName = amqpExchange.getName();
AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), routingKey.toString());

if (amqpQueue == null) {
Expand All @@ -717,13 +749,17 @@ public void receiveBasicPublish(AMQShortString exchangeName, AMQShortString rout
return;
}

// if (amqpQueue.getRouter("") == null) {
// AmqpMessageRouter amqpMessageRouter = new DirectMessageRouter(
// AmqpMessageRouter.Type.Direct, routingKey.toString());
// amqpQueue.bindExchange(amqpExchange, amqpMessageRouter);
// }
// bind to default exchange.
if (amqpQueue.getRouter(exchangeName) == null) {
amqpQueue.bindExchange(amqpExchange,
AbstractAmqpMessageRouter.generateRouter(AmqpExchange.Type.Direct),
routingKey.toString(), null);
}
} else {
exchangeName = exchange.toString();
}
MessagePublishInfo info = new MessagePublishInfo(exchangeName, immediate, mandatory, routingKey);
MessagePublishInfo info = new MessagePublishInfo(AMQShortString.valueOf(exchangeName), immediate,
mandatory, routingKey);
setPublishFrame(info, null);
}

Expand Down Expand Up @@ -888,14 +924,9 @@ private void deliverCurrentMessageIfComplete() {
return;
}

AmqpQueue amqpQueue = QueueContainer.getQueue(connection.getNamespaceName(), routingKey);
AmqpExchange amqpExchange;
if (exchangeName == null || exchangeName.length() == 0) {
if (amqpQueue.getDurable()) {
exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE;
} else {
exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE;
}
exchangeName = AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE;
}
amqpExchange = ExchangeContainer.getExchange(connection.getNamespaceName(), exchangeName);
if (amqpExchange == null) {
Expand Down Expand Up @@ -1119,6 +1150,16 @@ private boolean isDefaultExchange(final AMQShortString exchangeName) {
return exchangeName == null || AMQShortString.EMPTY_STRING.equals(exchangeName);
}

private boolean isBuildInExchange(final AMQShortString exchangeName) {
if (exchangeName.toString().equals(ExchangeDefaults.DIRECT_EXCHANGE_NAME)
|| (exchangeName.toString().equals(ExchangeDefaults.FANOUT_EXCHANGE_NAME))
|| (exchangeName.toString().equals(ExchangeDefaults.TOPIC_EXCHANGE_NAME))) {
return true;
} else {
return false;
}
}

private void closeChannel(int cause, final String message) {
connection.closeChannelAndWriteFrame(this, cause, message);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.protocol.ErrorCodes;
Expand Down Expand Up @@ -248,15 +250,23 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap

assertState(ConnectionState.AWAIT_OPEN);

boolean isDefaultNamespace = false;
String virtualHostStr = AMQShortString.toString(virtualHost);
if ((virtualHostStr != null) && virtualHostStr.charAt(0) == '/') {
virtualHostStr = virtualHostStr.substring(1);
if (StringUtils.isEmpty(virtualHostStr)){
virtualHostStr = DEFAULT_NAMESPACE;
isDefaultNamespace = true;
}
}

NamespaceName namespaceName = NamespaceName.get(amqpConfig.getAmqpTenant(), virtualHostStr);
if (isDefaultNamespace) {
// avoid the namespace public/default is not owned in standalone mode
TopicName topic = TopicName.get(TopicDomain.persistent.value(),
namespaceName, "__lookup__");
getPulsarService().getNamespaceService().getBrokerServiceUrlAsync(topic, true);
}
// Policies policies = getPolicies(namespaceName);
// if (policies != null) {
this.namespaceName = namespaceName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public void start(BrokerService service) {
brokerService = service;

ConnectionContainer.init(brokerService.getPulsar());
ExchangeContainer.init(brokerService.getPulsar());

if (amqpConfig.isAmqpProxyEnable()) {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,14 @@

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.streamnative.pulsar.handlers.amqp.impl.InMemoryExchange;
import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import java.util.Collections;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.qpid.server.exchange.ExchangeDefaults;
import org.apache.zookeeper.ZooKeeper;

/**
Expand All @@ -46,7 +42,7 @@ public static void init(PulsarService pulsarService) {
@Override
public void onLoad(NamespaceBundle namespaceBundle) {
log.info("ConnectionContainer [onLoad] namespaceBundle: {}", namespaceBundle);
defaultExchangeInit(namespaceBundle.getNamespaceObject());
ExchangeContainer.initBuildInExchange(namespaceBundle.getNamespaceObject());
}

@Override
Expand Down Expand Up @@ -99,33 +95,4 @@ public static void removeConnection(NamespaceName namespaceName, AmqpConnection
connectionMap.getOrDefault(namespaceName, Collections.emptySet()).remove(amqpConnection);
}

private static void defaultExchangeInit(NamespaceName namespaceName) {
AmqpExchange inMemoryExchange = ExchangeContainer.getExchange(namespaceName,
AbstractAmqpExchange.DEFAULT_EXCHANGE);

if (inMemoryExchange == null) {
ExchangeContainer.putExchange(namespaceName, AbstractAmqpExchange.DEFAULT_EXCHANGE,
new InMemoryExchange("", AmqpExchange.Type.Direct, false));
}

addBuildInExchanges(namespaceName, AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct);
addBuildInExchanges(namespaceName, ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct);
addBuildInExchanges(namespaceName, ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout);
addBuildInExchanges(namespaceName, ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic);
}

private static void addBuildInExchanges(NamespaceName namespaceName,
String exchangeName, AmqpExchange.Type exchangeType) {
String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName);
AmqpTopicManager.getTopic(pulsarService, topicName, true).whenComplete((topic, throwable) -> {
if (throwable != null) {
log.error("Create default exchange topic failed. errorMsg: {}", throwable.getMessage(), throwable);
return;
}
ExchangeContainer.putExchange(namespaceName, exchangeName,
new PersistentExchange(exchangeName, exchangeType,
(PersistentTopic) topic, false));
});
}

}
Loading

0 comments on commit e0ca154

Please sign in to comment.