如上图所示,一个典型的Kafka体系架构包括若干Producer(可以是服务器日志,业务数据,页面前端产生的page view等等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer (Group),以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。Producer使用push(推)模式将消息发布到broker,Consumer使用pull(拉)模式从broker订阅并消费消息。
-
名词解释 | 名称 | 解释 | | --- | --- | | Broker | 消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker可以组成一个Kafka集群 | | Topic | Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic | | Producer | 消息生产者,向Broker发送消息的客户端 | | Consumer | 消息消费者,从Broker读取消息的客户端 | | ConsumerGroup | 每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息 | | Partition | 物理上的概念,一个topic可以分为多个partition,每个partition内部是有序的 |
-
Consumer
一个topic可以认为一个一类消息,每个topic将被分成多个partition,每个partition在存储层面是append log文件。任何发布到此partition的消息都会被追加到log文件的尾部,每条消息在文件中的位置称为offset(偏移量),offset为一个long型的数字,它唯一标记一条消息。每条消息都被append到partition中,是顺序写磁盘,因此效率非常高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker中,会根据partition规则选择被存储到哪一个partition。如果partition规则设置的合理,所有消息可以均匀分布到不同的partition里,这样就实现了水平扩展。(如果一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在创建topic时可以在$KAFKA_HOME/config/server.properties中指定这个partition的数量。
在发送一条消息时,可以指定这个消息的key,producer根据这个key和partition机制来判断这个消息发送到哪个partition。partition机制可以通过指定producer的partition.class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。
Kafka only provides a total order over records within a partition, not between different partitions in a topic. 更详细的说明见下一章
producer 重要参数,完整参数参考 https://kafka.apache.org/0102/documentation.html#producerconfigs
NAME | DESCRIPTION | TYPE | DEFAULT | VALID VALUES | IMPORTANCE |
---|---|---|---|---|---|
acks | The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1. acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost. <acks=all(-1) This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting. |
string | 1 | [all, -1, 0, 1] | high |
buffer.memory | The total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will block for max.block.ms after which it will throw an exception. This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests. | long | 33554432 | [0,...] | high |
retries | Setting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries without setting max.in .flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first. |
int | 0 | [0,...,2147483647] | high |
batch.size | The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. No attempt will be made to batch records larger than this size. Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent. A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records. | int | 16384 | [0,...] | medium |
linger.ms | The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms = 5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load. |
long | 0 | [0,...] | medium |
max.block.ms | The configuration controls how long KafkaProducer.send() and KafkaProducer.partitionsFor() will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout. |
long | 60000 | [0,...] | medium |
request.timeout.ms | The configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted. This should be larger than replica.lag.time.max.ms (a broker configuration) to reduce the possibility of message duplication due to unnecessary producer retries. | int | 30000 | [0,...] | medium |
timeout.ms | The configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request. |
int | 30000 | [0,...] | medium |
block.on.buffer.full | When our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default this setting is false and the producer will no longer throw a BufferExhaustException but instead will use the max.block.ms value to block, after which it will throw a TimeoutException. Setting this property to true will set the max.block.ms to Long.MAX_VALUE. _Also if this property is set to true, parameter metadata.fetch.timeout.ms _ is no longer honored. This parameter is deprecated and will be removed in a future release. Parameter max.block.ms should be used instead. | boolean | false | low | |
max.in .flight.requests.per.connection | The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled). | int | 5 | [1,...] | low |
metadata.fetch.timeout.ms | The first time data is sent to a topic we must fetch metadata about that topic to know which servers host the topic's partitions. This config specifies the maximum time, in milliseconds, for this fetch to succeed before throwing an exception back to the client. | long | 60000 | [0,...] | low |
metadata.max.age.ms | The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions. | long | 300000 | [0,...] | low |
kafka java client 使用的是异步方式发送消息,即消息提交给 KafkaProducer 的 send 方法后,实际上是将该消息放入了它本身的一个后台发送队列,然后再有一个后台线程不断地从队列中取出消息进行发送,发送成功后会回调 send 方法的 callback(如果没有,就不用回调了)。
所以从以上的流程来看,kafka 客户端的发送流程是一个异步化的流程。
- kafka 客户端会累积一定量的消息后统一组装成一个批量消息发出,这个的触发条件是消息量达到了 batch.size 的大小或者等待批量的时间超过了 linger.ms 时间。
- 此外还要注意一下发送方消息的堆积问题,当程序的发送速率大于发送到 broker 的速率时,会产生消费在发送方堆积,堆积的策略控制主要由参数buffer.memory 和 max.block.ms 控制,buffer.memory设置了可使用的 buffer 内存,max.block.ms 是指在buffer满的情况下可以阻塞多长时间,超过这个时间则抛出异常。
- kafka 默认情况下是批量发送,批量发送存在消息积累再发送的过程,为了达到消息 send 后立刻发送到 broker 的要求,需要设置max.in.flight.requests.per.connection 参数。max.in.flight.requests.per.connection 以及 retries 主要应用于顺序消息场景,顺序场景中需要设置为:max.in.flight.requests.per.connection = 1
Availability and Durability Guarantees When writing to Kafka, producers can choose whether they wait for the message to be acknowledged by 0,1 or all (-1) replicas. Note that "acknowledgement by all replicas" does not guarantee that the full set of assigned replicas have received the message. By default, when acks=all, acknowledgement happens as soon as all the current in-sync replicas have received the message. For example, if a topic is configured with only two replicas and one fails (i.e., only one in sync replica remains), then writes that specify acks=all will succeed. However, these writes could be lost if the remaining replica also fails. Although this ensures maximum availability of the partition, this behavior may be undesirable to some users who prefer durability over availability. Therefore, we provide two topic-level configurations that can be used to prefer message durability over availability:
- Disable unclean leader election - if all replicas become unavailable, then the partition will remain unavailable until the most recent leader becomes available again. This effectively prefers unavailability over the risk of message loss. See the previous section on Unclean Leader Election for clarification.
- Specify a minimum ISR size - the partition will only accept writes if the size of the ISR is above a certain minimum, in order to prevent the loss of messages that were written to just a single replica, which subsequently becomes unavailable. This setting only takes effect if the producer uses acks=all and guarantees that the message will be acknowledged by at least this many in-sync replicas. This setting offers a trade-off between consistency and availability. A higher setting for minimum ISR size guarantees better consistency since the message is guaranteed to be written to more replicas which reduces the probability that it will be lost. However, it reduces availability since the partition will be unavailable for writes if the number of in-sync replicas drops below the minimum threshold.