KumuluzEE AMQP project for development of messaging applications.
KumuluzEE AMQP enables you to easily send and recieve messages over the AMQP protocol. Currently, it includes the RabbitMQ-based implementation. The RabbitMQ documentation can be found here.
You can enable KumuluzEE AMQP RabbitMQ support by adding the following dependency to pom.xml:
<dependency>
<groupId>com.kumuluz.ee.amqp</groupId>
<artifactId>kumuluzee-amqp-rabbitmq</artifactId>
<version>${kumuluzee-amqp-rabbitmq.version}</version>
</dependency>
In order to use RabbitMQ, you first need to install the RabbitMQ broker. You can find installation guide here.
To configure RabbitMQ, create configuration in resources/config.yaml. Here you can put your RabbitMQ hosts and their configurations.
kumuluzee:
amqp:
rabbitmq:
hosts: List
name: String (required)
url: String (required) - null
password: String - null
username: String - null
port: Integer - null
automaticRecoveryEnabled: Boolean - null
channelRpcTimeout: Integer - null
channelShouldCheckRpcResponseType: Boolean - null
connectionTimeout: Integer - null
enableHostnameVerification: Boolean - null
handshakeTimeout: Integer - null
networkRecoveryInterval: Integer - null
requestedChannelMax: Integer - null
requestedFrameMax: Integer - null
requestedHeartbeat: Integer - null
shutdownTimeout: Integer - null
topologyRecoveryEnabled: Boolean - null
uri: String - null
virtualHost: String - null
automaticRecoveryEnabled: Boolean - null
clientProperties: Map<String, Object> - null
exchanges: List
name: String (required)
type: String [fanout, direct, topic, headers] - fanout
durable: Boolean - false
autoDelete: Boolean - false
arguments: Map<String, Object> - null
queues:
name: String (required)
exclusive: Boolean - false
durable: Boolean - false
autoDelete: Boolean - false
arguments: Map<String, Object> - null
properties: List
name: String (required)
contentType: String - null
contentEncoding: String - null
headers: Map<String, Object> - null
deliveryMode: Integer - null
priority: Integer - null
corelationId: String - null
replyTo: String - null
expiration: String - null
messageId: String - null
timestamp: Boolean - null
type: String - null
userId: String - null
appId: String - null
clusterId: String - null
You can create a connection to a server with parameters that are not available in the config.yaml
.
Annotate a class (all methods which return a map will be considered) or a method (only a method which returns a map will be considered) with @AMQPConnection
. In this method create a new connection to the broker using ConnectionFactory provided by RabbitMQ.
The method has to return a Map<String, Connection> object, where String is the name of the connection. You can then configure exchanges and queues in the config.yaml
with the name you selected. All other parameters in the config.yaml are ignored.
@AMQPConnection
public Map<String, Connection> localhostConnection() {
Map<String, Connection> localhost = new HashMap<>();
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
Connection connection = null;
try {
connection = connectionFactory.newConnection();
} catch (IOException | TimeoutException e) {
log.severe("Connection could not be created");
}
localhost.put("MQtest", connection);
return localhost;
}
kumuluzee:
amqp:
rabbitmq:
hosts:
- name: MQtest
port: 9000 #ignored
automaticRecoveryEnabled: true #ignored
exchanges:
- name: directExchange
type: direct
The connection to the server is managed by the framework and does not need to be closed. In case you want to manage connections yourself use RabbitConnection class where you can create and close connections.
You can obtain RabbitMQ Channel by using @AMQPChannel
. Then you can use this channel to send and receive messages (Read more).
@AMQPChannel(host: String - "")
@Inject
@AMQPChannel("hostName")
private Channel channel;
In order to listen to queues you can annotate your method with @AMQPConsumer
.
The first parameter of the annotated method is the object of type that is expected to be received, the second parameter is optional and must be of type MessageInfo which will allow you to obtain details about the received messages.
If the message body cannot be deserialized to the expected type the method will not be invoked and an error will be logged.
@AMQPConsumer(
host: String - "",
exchange: String - "",
key: String[] - {""},
prefetch: int - 100,
autoAck: boolean - true
)
@AMQPConsumer(...)
public void listenToDirectExchange(
message: any,
info: MessageInfo - optional
) {
...
}
You can send messages by annotating a method with @AMQPProducer
annotation. The method must return an object which will be then sent to the consumer.
@AMQPProducer(
host: String - "",
exchange: String - "",
key: String[] - {""},
properties: String - ""
)
@AMQPProducer(host="MQtest", exchange = "directExchange", key = "object")
public ExampleObject sendObject() {
ExampleObject exampleObject = new ExampleObject();
exampleObject.setContent("I'm just an object.");
return exampleObject;
}
You can also send a Message
object, where you can define host, exchange, keys, body and properties (which wouldn't be possible to define in the config.yml
). Keep in mind that Message
parameters will override annotation parameters.
@AMQPProducer
public Message sendFullMessage() {
Message message = new Message();
ExampleObject exampleObject = new ExampleObject();
exampleObject.setContent("I'm an object in a special message");
if (Math.random() < 0.5) {
message.host("MQtest")
.key(new String[]{"object"})
.exchange("directExchange")
.basicProperties(MessageProperties.BASIC);
} else {
message.host("MQtest2")
.key(new String[]{"testQueue"})
.basicProperties("testProperty");
}
return message.body(exampleObject);
}
To send a message to a specific queue, you just have to remove the exchange from the annotation and use key as a queue name
@AMQPProducer(host="MQtest2", key = "testQueue")
public Message sendToQueue(){
Message message = new Message();
ExampleObject exampleObject = new ExampleObject();
exampleObject.setContent("I'm an object in a message");
return message.body(exampleObject).basicProperties(MessageProperties.BASIC);
}
You can start by using the sample code.
Recent changes can be viewed on Github on the Releases Page
See the contributing docs
When submitting an issue, please follow the guidelines.
When submitting a bugfix, write a test that exposes the bug and fails before applying your fix. Submit the test alongside the fix.
When submitting a new feature, add tests that cover the feature.
MIT