Skip to content

Commit

Permalink
Support basic pub-sub for Qpid-JMS client (#122)
Browse files Browse the repository at this point in the history
Fixes #114 

### Motivation

There are some obstacles to use Qpid-JMS client work with AoP. We could walk around some features first to make the Qpid-JMS support the basic pub-sub operation.

### Modifications

Walk around some features as below.

1. Security process in `AmqpConnection.receiveConnectionStartOk` method.
2. The `channelFlow` process  in `AmqpChannel.receiveChannelFlow` method.
3. Transaction related process in `AmqpChannel.receiveTxSelect`, `AmqpChannel.receiveTxCommit` and `AmqpChannel.receiveTxRollback` methods.

### Verifying this change

Add Qpid-JMS client test.
  • Loading branch information
gaoran10 committed Jun 26, 2020
1 parent 332c1ff commit f2d4f86
Show file tree
Hide file tree
Showing 22 changed files with 553 additions and 345 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.qpid.server.protocol.v0_8.transport.BasicCancelOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.BasicContentHeaderProperties;
import org.apache.qpid.server.protocol.v0_8.transport.BasicNackBody;
import org.apache.qpid.server.protocol.v0_8.transport.ChannelFlowOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ConfirmSelectOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentBody;
import org.apache.qpid.server.protocol.v0_8.transport.ContentHeaderBody;
Expand All @@ -77,6 +78,9 @@
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeclareOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.QueueDeleteOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.ServerChannelMethodProcessor;
import org.apache.qpid.server.protocol.v0_8.transport.TxCommitOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.TxRollbackOkBody;
import org.apache.qpid.server.protocol.v0_8.transport.TxSelectOkBody;
import org.apache.qpid.server.txn.AsyncCommand;
import org.apache.qpid.server.txn.ServerTransaction;

Expand Down Expand Up @@ -127,11 +131,14 @@ public class AmqpChannel implements ServerChannelMethodProcessor {
private final AtomicBoolean blockedOnCredit = new AtomicBoolean(false);
public static final int DEFAULT_CONSUMER_PERMIT = 1000;

public AmqpChannel(int channelId, AmqpConnection connection) {
private final AmqpTopicManager amqpTopicManager;

public AmqpChannel(int channelId, AmqpConnection connection, AmqpTopicManager amqpTopicManager) {
this.channelId = channelId;
this.connection = connection;
this.unacknowledgedMessageMap = new UnacknowledgedMessageMap(this);
this.creditManager = new AmqpFlowCreditManager(0, 0);
this.amqpTopicManager = amqpTopicManager;
}

@Override
Expand Down Expand Up @@ -178,9 +185,7 @@ public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type,
if (durable) {
TopicName topicName = TopicName.get(
TopicDomain.persistent.value(), connection.getNamespaceName(), exchangeName);
CompletableFuture<Topic> tf = AmqpTopicManager.getTopic(connection.getPulsarService(),
topicName.toString(),
false);
CompletableFuture<Topic> tf = amqpTopicManager.getTopic(topicName.toString(), false);
tf.whenComplete((t, e) -> {
if (e != null) {
closeChannel(INTERNAL_ERROR, e.getMessage());
Expand Down Expand Up @@ -254,8 +259,8 @@ public void receiveExchangeDeclare(AMQShortString exchange, AMQShortString type,
connection.getNamespaceName(), exchangeName);
try {
PersistentTopic persistentTopic =
(PersistentTopic) AmqpTopicManager.getOrCreateTopic(connection.getPulsarService(),
exchangeTopicName, true);
(PersistentTopic) amqpTopicManager.getOrCreateTopic(
exchangeTopicName, true);
if (persistentTopic == null) {
connection.sendConnectionClose(INTERNAL_ERROR,
"AOP Create Exchange failed.", channelId);
Expand Down Expand Up @@ -341,7 +346,7 @@ public void receiveExchangeBound(AMQShortString exchange, AMQShortString routing
TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
connection.getNamespaceName(), exchange.toString());

Topic topic = AmqpTopicManager.getOrCreateTopic(connection.getPulsarService(), topicName.toString(), false);
Topic topic = amqpTopicManager.getOrCreateTopic(topicName.toString(), false);
if (null == topic) {
replyCode = ExchangeBoundOkBody.EXCHANGE_NOT_FOUND;
replyText = replyText.insert(0, "Exchange '").append(exchange).append("' not found");
Expand Down Expand Up @@ -399,8 +404,8 @@ public void receiveQueueDeclare(AMQShortString queue, boolean passive, boolean d
try {
String queueTopicName = PersistentQueue.getQueueTopicName(
connection.getNamespaceName(), queue.toString());
PersistentTopic indexTopic = (PersistentTopic) AmqpTopicManager
.getOrCreateTopic(connection.getPulsarService(), queueTopicName, true);
PersistentTopic indexTopic = (PersistentTopic) amqpTopicManager.getOrCreateTopic(
queueTopicName, true);
amqpQueue = new PersistentQueue(queue.toString(), indexTopic, connection.getConnectionId(),
exclusive, autoDelete);
} catch (Exception e) {
Expand Down Expand Up @@ -564,7 +569,7 @@ public void receiveQueueUnbind(AMQShortString queue, AMQShortString exchange, AM
TopicName topicName = TopicName.get(TopicDomain.persistent.value(),
connection.getNamespaceName(), exchangeName);

Topic topic = AmqpTopicManager.getOrCreateTopic(connection.getPulsarService(), topicName.toString(), false);
Topic topic = amqpTopicManager.getOrCreateTopic(topicName.toString(), false);
if (null == topic) {
connection.sendConnectionClose(INTERNAL_ERROR, "exchange not found.", channelId);
} else {
Expand Down Expand Up @@ -807,12 +812,19 @@ public void receiveBasicGet(AMQShortString queue, boolean noAck) {

@Override
public void receiveChannelFlow(boolean active) {

if (log.isDebugEnabled()) {
log.debug("RECV[{}] ChannelFlow[active: {}]", channelId, active);
}
// TODO channelFlow process
ChannelFlowOkBody body = connection.getMethodRegistry().createChannelFlowOkBody(true);
connection.writeFrame(body.generateFrame(channelId));
}

@Override
public void receiveChannelFlowOk(boolean active) {

if (log.isDebugEnabled()) {
log.debug("RECV[{}] ChannelFlowOk[active: {}]", channelId, active);
}
}

@Override
Expand Down Expand Up @@ -1085,17 +1097,32 @@ public void receiveBasicReject(long deliveryTag, boolean requeue) {

@Override
public void receiveTxSelect() {

if (log.isDebugEnabled()) {
log.debug("RECV[{}] TxSelect", channelId);
}
// TODO txSelect process
TxSelectOkBody txSelectOkBody = connection.getMethodRegistry().createTxSelectOkBody();
connection.writeFrame(txSelectOkBody.generateFrame(channelId));
}

@Override
public void receiveTxCommit() {

if (log.isDebugEnabled()) {
log.debug("RECV[{}] TxCommit", channelId);
}
// TODO txCommit process
TxCommitOkBody txCommitOkBody = connection.getMethodRegistry().createTxCommitOkBody();
connection.writeFrame(txCommitOkBody.generateFrame(channelId));
}

@Override
public void receiveTxRollback() {

if (log.isDebugEnabled()) {
log.debug("RECV[{}] TxRollback", channelId);
}
// TODO txRollback process
TxRollbackOkBody txRollbackBody = connection.getMethodRegistry().createTxRollbackOkBody();
connection.writeFrame(txRollbackBody.generateFrame(channelId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,14 @@ public class AmqpChannelInitializer extends ChannelInitializer<SocketChannel> {
private final PulsarService pulsarService;
@Getter
private final AmqpServiceConfiguration amqpConfig;
private final AmqpTopicManager amqpTopicManager;

public AmqpChannelInitializer(PulsarService pulsarService,
AmqpServiceConfiguration amqpConfig) {
public AmqpChannelInitializer(PulsarService pulsarService, AmqpServiceConfiguration amqpConfig,
AmqpTopicManager amqpTopicManager) {
super();
this.pulsarService = pulsarService;
this.amqpConfig = amqpConfig;
this.amqpTopicManager = amqpTopicManager;
}

@Override
Expand All @@ -49,7 +51,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("frameEncoder",
new AmqpEncoder());
ch.pipeline().addLast("handler",
new AmqpConnection(pulsarService, amqpConfig));
new AmqpConnection(pulsarService, amqpConfig, amqpTopicManager));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,11 @@ enum ConnectionState {
private AmqpOutputConverter amqpOutputConverter;
private ServerCnx pulsarServerCnx;

public AmqpConnection(PulsarService pulsarService, AmqpServiceConfiguration amqpConfig) {
@Getter
private final AmqpTopicManager amqpTopicManager;

public AmqpConnection(PulsarService pulsarService, AmqpServiceConfiguration amqpConfig,
AmqpTopicManager amqpTopicManager) {
super(pulsarService, amqpConfig);
this.connectionId = ID_GENERATOR.incrementAndGet();
this.channels = new ConcurrentLongHashMap<>();
Expand All @@ -115,8 +119,10 @@ public AmqpConnection(PulsarService pulsarService, AmqpServiceConfiguration amqp
this.maxFrameSize = amqpConfig.getAmqpMaxFrameSize();
this.heartBeat = amqpConfig.getAmqpHeartBeat();
this.amqpOutputConverter = new AmqpOutputConverter(this);
this.amqpTopicManager = amqpTopicManager;
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
Expand Down Expand Up @@ -178,9 +184,19 @@ public void receiveConnectionStartOk(FieldTable clientProperties, AMQShortString
}
assertState(ConnectionState.AWAIT_START_OK);
// TODO clientProperties
AMQMethodBody responseBody = this.methodRegistry.createConnectionSecureBody(new byte[0]);
writeFrame(responseBody.generateFrame(0));
state = ConnectionState.AWAIT_SECURE_OK;

// TODO security process
// AMQMethodBody responseBody = this.methodRegistry.createConnectionSecureBody(new byte[0]);
// writeFrame(responseBody.generateFrame(0));
// state = ConnectionState.AWAIT_SECURE_OK;

ConnectionTuneBody tuneBody =
methodRegistry.createConnectionTuneBody(maxChannels,
maxFrameSize,
heartBeat);

writeFrame(tuneBody.generateFrame(0));
state = ConnectionState.AWAIT_TUNE_OK;
}

@Override
Expand Down Expand Up @@ -349,7 +365,7 @@ public void receiveChannelOpen(int channelId) {
channelId);
} else {
log.debug("Connecting to: {}", namespaceName.getLocalName());
final AmqpChannel channel = new AmqpChannel(channelId, this);
final AmqpChannel channel = new AmqpChannel(channelId, this, amqpTopicManager);
addChannel(channel);

ChannelOpenOkBody response = getMethodRegistry().createChannelOpenOkBody();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public class AmqpProtocolHandler implements ProtocolHandler {
@Getter
private String bindAddress;

private AmqpTopicManager amqpTopicManager;
private ExchangeContainer exchangeContainer;
private QueueContainer queueContainer;
private ConnectionContainer connectionContainer;

@Override
public String protocolName() {
Expand Down Expand Up @@ -86,8 +90,10 @@ public String getProtocolDataToAdvertise() {
public void start(BrokerService service) {
brokerService = service;

ConnectionContainer.init(brokerService.getPulsar());
ExchangeContainer.init(brokerService.getPulsar());
this.amqpTopicManager = new AmqpTopicManager(brokerService.getPulsar());
this.exchangeContainer = new ExchangeContainer(this.amqpTopicManager);
this.queueContainer = new QueueContainer();
this.connectionContainer = new ConnectionContainer(brokerService.getPulsar(), this.exchangeContainer);

if (amqpConfig.isAmqpProxyEnable()) {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
Expand Down Expand Up @@ -130,8 +136,7 @@ public Map<InetSocketAddress, ChannelInitializer<SocketChannel>> newChannelIniti
if (listener.startsWith(PLAINTEXT_PREFIX)) {
builder.put(
new InetSocketAddress(brokerService.pulsar().getBindAddress(), getListenerPort(listener)),
new AmqpChannelInitializer(brokerService.pulsar(),
amqpConfig));
new AmqpChannelInitializer(brokerService.pulsar(), amqpConfig, amqpTopicManager));
} else {
log.error("Amqp listener {} not supported. supports {} and {}",
listener, PLAINTEXT_PREFIX, SSL_PREFIX);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,17 @@
@Slf4j
public class AmqpTopicManager {

private PulsarService pulsarService;

public static Topic getOrCreateTopic(PulsarService pulsarService, String topicName, boolean createIfMissing) {
return getTopic(pulsarService, topicName, createIfMissing).join();
public AmqpTopicManager(PulsarService pulsarService) {
this.pulsarService = pulsarService;
}

public static CompletableFuture<Topic> getTopic(PulsarService pulsarService, String topicName,
boolean createIfMissing) {
public Topic getOrCreateTopic(String topicName, boolean createIfMissing) {
return getTopic(topicName, createIfMissing).join();
}

public CompletableFuture<Topic> getTopic(String topicName, boolean createIfMissing) {
CompletableFuture<Topic> topicCompletableFuture = new CompletableFuture<>();
if (null == pulsarService) {
log.error("PulsarService is not set.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,26 +23,26 @@
import org.apache.pulsar.broker.namespace.NamespaceBundleOwnershipListener;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.zookeeper.ZooKeeper;

/**
* Connection container, listen bundle unload event, release connection resource.
*/
@Slf4j
public class ConnectionContainer {

private static PulsarService pulsarService;
public static ZooKeeper zooKeeper;
private ExchangeContainer exchangeContainer;
private static Map<NamespaceName, Set<AmqpConnection>> connectionMap = Maps.newConcurrentMap();

public static void init(PulsarService pulsarService) {
ConnectionContainer.pulsarService = pulsarService;
ConnectionContainer.zooKeeper = pulsarService.getLocalZkCache().getZooKeeper();
public ConnectionContainer(PulsarService pulsarService, ExchangeContainer exchangeContainer) {
this.exchangeContainer = exchangeContainer;
pulsarService.getNamespaceService().addNamespaceBundleOwnershipListener(new NamespaceBundleOwnershipListener() {
@Override
public void onLoad(NamespaceBundle namespaceBundle) {
log.info("ConnectionContainer [onLoad] namespaceBundle: {}", namespaceBundle);
ExchangeContainer.initBuildInExchange(namespaceBundle.getNamespaceObject());
int brokerPort = pulsarService.getBrokerListenPort().isPresent()
? pulsarService.getBrokerListenPort().get() : 0;
log.info("ConnectionContainer [onLoad] namespaceBundle: {}, brokerPort: {}",
namespaceBundle, brokerPort);
ConnectionContainer.this.exchangeContainer.initBuildInExchange(namespaceBundle.getNamespaceObject());
}

@Override
Expand Down Expand Up @@ -92,6 +92,9 @@ public static void addConnection(NamespaceName namespaceName, AmqpConnection amq
}

public static void removeConnection(NamespaceName namespaceName, AmqpConnection amqpConnection) {
if (namespaceName == null) {
return;
}
connectionMap.getOrDefault(namespaceName, Collections.emptySet()).remove(amqpConnection);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,29 +22,25 @@
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.qpid.server.exchange.ExchangeDefaults;


/**
* Container for all exchanges in the broker.
*/
@Slf4j
public class ExchangeContainer {

private static final Map<String, AmqpExchange.Type> BUILDIN_EXCHANGE_NAME_SET = new HashMap<>();
private static PulsarService pulsarService;
private static final Map<String, AmqpExchange.Type> BUILDIN_EXCHANGE_NAME_MAP = new HashMap<>();
private AmqpTopicManager amqpTopicManager;

public static void init(PulsarService pulsarService) {
if (ExchangeContainer.pulsarService == null) {
ExchangeContainer.pulsarService = pulsarService;
BUILDIN_EXCHANGE_NAME_SET.put(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct);
BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct);
BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout);
BUILDIN_EXCHANGE_NAME_SET.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic);
}
public ExchangeContainer(AmqpTopicManager amqpTopicManager) {
this.amqpTopicManager = amqpTopicManager;
BUILDIN_EXCHANGE_NAME_MAP.put(AbstractAmqpExchange.DEFAULT_EXCHANGE_DURABLE, AmqpExchange.Type.Direct);
BUILDIN_EXCHANGE_NAME_MAP.put(ExchangeDefaults.DIRECT_EXCHANGE_NAME, AmqpExchange.Type.Direct);
BUILDIN_EXCHANGE_NAME_MAP.put(ExchangeDefaults.FANOUT_EXCHANGE_NAME, AmqpExchange.Type.Fanout);
BUILDIN_EXCHANGE_NAME_MAP.put(ExchangeDefaults.TOPIC_EXCHANGE_NAME, AmqpExchange.Type.Topic);
}

@Getter
Expand Down Expand Up @@ -81,26 +77,30 @@ public static void deleteExchange(NamespaceName namespaceName, String exchangeNa
}
}

public static void initBuildInExchange(NamespaceName namespaceName) {
for (Map.Entry<String, AmqpExchange.Type> entry : BUILDIN_EXCHANGE_NAME_SET.entrySet()) {
public void initBuildInExchange(NamespaceName namespaceName) {
for (Map.Entry<String, AmqpExchange.Type> entry : BUILDIN_EXCHANGE_NAME_MAP.entrySet()) {
if (getExchange(namespaceName, entry.getKey()) == null) {
addBuildInExchanges(namespaceName, entry.getKey(), entry.getValue());
}
}
}

private static void addBuildInExchanges(NamespaceName namespaceName,
String exchangeName, AmqpExchange.Type exchangeType) {
private void addBuildInExchanges(NamespaceName namespaceName,
String exchangeName, AmqpExchange.Type exchangeType) {
String topicName = PersistentExchange.getExchangeTopicName(namespaceName, exchangeName);
AmqpTopicManager.getTopic(pulsarService, topicName, true).whenComplete((topic, throwable) -> {
amqpTopicManager.getTopic(topicName, true).whenComplete((topic, throwable) -> {
if (throwable != null) {
log.error("Create default exchange topic failed. errorMsg: {}", throwable.getMessage(), throwable);
return;
}
ExchangeContainer.putExchange(namespaceName, exchangeName,
new PersistentExchange(exchangeName, exchangeType,
(PersistentTopic) topic, false));
log.info("[addBuildInExchanges] namespaceName: {}, exchangeName: {}", namespaceName, exchangeName);
});
}

public static Map<String, AmqpExchange.Type> getBuildinExchangeNameMap() {
return BUILDIN_EXCHANGE_NAME_MAP;
}
}
Loading

0 comments on commit f2d4f86

Please sign in to comment.