Skip to content

Java messageconsumer

dennis zhuang edited this page Jun 2, 2013 · 10 revisions

本指南以1.4.5版本的java客户端为起点编写。

在消息生产者发送消息成功后,消息就存储在服务端的数据目录里(服务端dataPath配置的目录),按照topic-partition的目录格式存储。接下来,消息的消费者就需要消费这些消息,这一节就是讲述如何消费某个topic下的消息。 #配置消费者 每个Java的消费者都需要一个ConsumerConfig的配置实例。 ##消费者分组 在MetaQ里,消费者被认为是一个集群,也就是说认为是有一组的机器在共同分担消费一个topic。因此消费者配置ConsumerConfig中最重要的配置是group,每个消费者都必须告诉MetaQ它属于哪个group,然后MetaQ会找出这个group下所有注册上来的消费者,在他们之间做负载均衡,共同消费一个或多个topic。注意,不同group之间可以认为是不同的消费者,他们消费同一个topic下的消息的进度是不同。

举例来说,假设你有一个topic为business-logs,是所有业务系统的日志。然后现在你对这些日志要做两个事情:一个是存储到HDFS这样的分布式文件系统,以便后续做分析处理;以个是Twitter Storm这样的实时分析系统,做实时的数据分析、告警和展现。显然,这里你就需要两个group,比如我们有一个group叫hdfs-writer,它有三台机器同时消费business-logs,将日志存储到HDFS集群。同时,你也有另一个group叫storm-spouts,有5台机器用来给storm集群喂数据。这两个group是隔离,虽然是消费同一个group,但是两者是消费进度(消费了多少个消息,等待消费多少个消息等信息)是不同的。但是同一个group内,例如hdfs-writer的三台机器,这三台机器是共同消费business-logs下的消息,同一条消息只会被这hdfs-writer三台机器中的一台处理,但是这条消息还会被twitter-spouts等其他分组内的某一台机器消费。

##创建ConsumerConfig

创建ConsumerConfig并传入分组名称:

final String group = "hdfs-writer";
ConsumerConfig consumerConfig = new ConsumerConfig(group);

ConsumerConfig的其他重要选项还包括:

  • fetchRunnerCount, 因为MetaQ的消费者是以pull模型来从服务端拉取数据并消费,这个参数设置并行拉取的线程数,默认是CPUs个。
  • fetchTimeoutInMills,同步抓取的请求超时,默认10秒,通常不需要修改此参数。
  • consumerId, 单个消费者的id,必须全局唯一,通常用于标识分组内的单个消费者,可不设置,系统会根据IP和时间戳自动生成。
  • offset, 第一次消费开始位置的offset,默认都是从服务端的最早数据开始消费。
  • commitOffsetPeriodInMills, 保存消费者已经消费的数据的offset的间隔时间,默认5秒。更大的间隔,在故障和重启时间可能重复消费的消息更多,更小的间隔,可能给存储造成压力。
  • maxFetchRetries,同一条消息在处理失败情况下最大重试消费次数,默认5次,超过就跳过这条消息并调用RejectConsumptionHandler处理。关于RejectConsumptionHandler请看下面的拒绝处理小节。

#创建消费者

通过MessageSessionFactorycreateConsumer方法即可创建消费者:

 final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig);

##Offset的存储

MetaQ的消费模型是一种拉取的模型,消费者根据上次消费数据的绝对偏移量(offset)从服务端的数据文件中拉取后面的数据继续消费,因此这个offset信息就非常关键,需要可靠地保存。默认情况下,MetaQ是将offset信息保存在你使用的zookeeper集群上,也就是ZkOffsetStorage所做的事情,它实现了OffsetStorage接口。通常这样的保存是可靠并且安全的,但是有时候可能你也需要其他选项,目前还提供两个不同的OffsetStorage实现:

  • LocalOffsetStorage,使用consumer的本地文件作为offset存储,默认存储在${HOME}/.meta_offsets的文件里。适合消费者分组只有一个消费者的情况,无需共享offset信息。例如广播类型的消费者就特别合适。
  • MysqlOffsetStorage,使用Mysql作为offset存储,使用前需要创建表结构:
CREATE TABLE `meta_topic_partition_group_offset` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `topic` varchar(255) NOT NULL,
  `partition` varchar(255) NOT NULL,
  `group_id` varchar(255) NOT NULL,
  `offset` int(11) NOT NULL,
  `msg_id` int(11) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `TOPIC_PART_GRP_IDX` (`topic`,`partition`,`group_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

你也可以实现自己的OffsetStorage存储。如果你想使用除了zookeeper之外的offset存储,可以在创建消费者的时候传入:

final MessageConsumer consumer = sessionFactory.createConsumer(consumerConfig,new MysqlOffsetStorage(dataSource));

mysql存储需要传入JDBC数据源。

#订阅Topic

##MessageListener

#处理消息

##并发处理

##回滚消息

##错误处理

##中断处理

##拒绝处理器

#关闭

Clone this wiki locally