-
Notifications
You must be signed in to change notification settings - Fork 691
Java message
Roy edited this page Nov 6, 2017
·
19 revisions
本指南以1.4.5版本的java客户端为起点编写。
- 简单例子
- 消息
- 客户端配置
- 会话工厂MessageSessionFactory
- 发送消息MessageProducer
- 订阅消息MessageConsumer
- 遍历消息TopicBrowser
- Spring框架支持
- 高级主题
消息作为逻辑业务的载体,在不同系统之间流转,而流转的中介就是所谓消息中间件MQ。可以这么比喻,消息是邮包,而MQ就是邮局,你只要将邮包交给邮局,写好收件人信息,那么邮局就会将你的邮包正确投递给收件人。
MetaQ的消息在Java客户端里是com.taobao.metamorphosis.Message
类,它主要包括这么几个属性:
- id 一个64位的消息ID,它在同一个MetaQ集群内唯一并且单调递增。也就是说在同一个线程内发送的消息,他们的消息id是顺序递增的。这个id不可设置,它是在服务端帮你自动生成,你只能读取这个id,通过
getId
方法。通常,我们可以利用这个唯一的id来做消息去重。 - topic 也就是消息的主题,如果以我们收看电视为例,就是所谓频道,比如CCAV。
- partition,消息所在的分区,同一个topic还根据配置分为不同的分区,你可以将topic理解为一个上层目录,而分区就是这个目录里的子目录(实际实现也是类似)。发送的消息将存储到这个topic下的某个分区,要查询消息的存储分区,就要通过
getPartition
方法,返回的分区对象属于com.taobao.metamorphosis.cluster.Partition
类,它包含了消息所在的broker id以及具体的分区号码. - data 消息的载体,也就是消息的具体内容,同样以收看电视为例,就是频道的节目内容,以邮递为例,就是邮递的东西。
- attribute 消息的附加属性,一个字符串,可有可无。例如电视可能是高清频道,也可能是3D频道,3D或者高清就是个频道的属性,但不是必须的。你可以通过
hasAttribute
方法来检测消息是否有属性。 - readOnly 消息是否为只读,可将消息设置为只读状态(通过
setReadOnly(true)
),只允许读,不可以更改。但是只读属性只是临时的,你在发送的时候将消息设置为只读,消费者接收到的消息仍然是一个可变的消息,也就是说这个状态不会被服务端持久化(persistence). - rollback 消息是否回滚。这只对消费者有意义,消费者可以主动回滚一个消息,通过
setRollbackOnly(true)
。那么这个消息将会被重新消费。这个状态也是临时的,下次消费这条消息的时候,它仍然为false。比喻来说,这类似电视频道的重放功能,你想将当前播放的节目回滚保存,因为现在你不想看,待会你想重新从这个节目开始看起。
这些属性一般都有相应的getter/setter
方法用于获取和设置。
我们通过一些例子来熟悉这些属性。
创建不带属性的消息:
final String topic = "meta-test";
final byte [] data = "hello world".getBytes();
final Message msg = new Message(topic, data);
data是作为消息载体就是一个byte数组,你可以将你的消息内容使用任何序列化方式序列化为byte数组,比如最常见的Java序列化。 topic就是一个字符串,你想发送的topic必须在Broker端配置后才可以发送。如何发送任意Topic请看FAQ一节。
创建带属性的消息
final String topic = "meta-test";
final byte [] data = "hello world".getBytes();
final String attribute = "attribute";
final Message msg = new Message(topic, data, attribute);
System.out.println(msg.hasAttribute()); //true
消息属性只是一个简单的字符串,通常你可以利用消息属性来传递一些额外信息给生产者或者消费者,用来做分区选择或者消息过滤。
msg.setReadOnly(true);
System.out.println(msg.isReadOnly()); //true
msg.setTopic(newTopic); //throw IllegalStateException.
msg.setReadOnly(false);
msg.setTopic(newTopic); //set new topic successfully.
将消息设置为只读后,再尝试修改消息的topic或者data,都会抛出IllegalStateException
异常。一般你会将消息设置为只读来防止消息的意外修改。
SendResult rt = producer.send(msg);
long id = msg.getId();
Partition part = msg.getPartition();
int brokerId = part.getBrokerId();
int partition = part.getPartition();
发送后,MessageProducer
会自动地将服务端返回的id和分区信息填入message。
消息的回滚将在消息的消费者一节介绍。