Skip to content

Commit

Permalink
2 test classes for Kafka test (ConsumerPerf and ProducerPerf)
Browse files Browse the repository at this point in the history
  • Loading branch information
antoineauger committed Jul 12, 2017
1 parent db3d9fb commit 810d785
Show file tree
Hide file tree
Showing 2 changed files with 604 additions and 0 deletions.
303 changes: 303 additions & 0 deletions src/test/ConsumerPerformance.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.tools

import java.util

import scala.collection.JavaConverters._
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException

import org.apache.log4j.Logger
import org.apache.kafka.clients.consumer.{ConsumerRebalanceListener, KafkaConsumer}
import org.apache.kafka.common.serialization.ByteArrayDeserializer
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.TopicPartition
import kafka.utils.CommandLineUtils
import java.util.{Collections, Properties, Random}

import kafka.consumer.Consumer
import kafka.consumer.ConsumerConnector
import kafka.consumer.KafkaStream
import kafka.consumer.ConsumerTimeoutException
import java.text.SimpleDateFormat
import java.util.concurrent.atomic.AtomicBoolean

/**
* Performance test for the full zookeeper consumer
*/
object ConsumerPerformance {
private val logger = Logger.getLogger(getClass())

def main(args: Array[String]): Unit = {
println("###### Modified ConsumerPerformance for iQAS evaluation ######");

val config = new ConsumerPerfConfig(args)
logger.info("Starting consumer...")
val totalMessagesRead = new AtomicLong(0)
val totalBytesRead = new AtomicLong(0)
val consumerTimeout = new AtomicBoolean(false)

if (!config.hideHeader) {
if (!config.showDetailedStats)
println("start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
else
println("time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
}

var startMs, endMs = 0L
if (!config.useOldConsumer) {
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](config.props)
consumer.subscribe(Collections.singletonList(config.topic))
startMs = System.currentTimeMillis
consume(consumer, List(config.topic), config.numMessages, 1000, config, totalMessagesRead, totalBytesRead)
endMs = System.currentTimeMillis
consumer.close()
} else {
import kafka.consumer.ConsumerConfig
val consumerConfig = new ConsumerConfig(config.props)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
for (streamList <- topicMessageStreams.values)
for (i <- 0 until streamList.length)
threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config, totalMessagesRead, totalBytesRead, consumerTimeout)

logger.info("Sleeping for 1 second.")
Thread.sleep(1000)
logger.info("starting threads")
startMs = System.currentTimeMillis
for (thread <- threadList)
thread.start
for (thread <- threadList)
thread.join
endMs =
if (consumerTimeout.get()) System.currentTimeMillis - consumerConfig.consumerTimeoutMs
else System.currentTimeMillis
consumerConnector.shutdown()
}
val elapsedSecs = (endMs - startMs) / 1000.0
if (!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get * 1.0) / (1024 * 1024)
println("%s, %s, %.4f, %.4f, %d, %.4f".format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
totalMBRead, totalMBRead / elapsedSecs, totalMessagesRead.get, totalMessagesRead.get / elapsedSecs))
}
}

def consume(consumer: KafkaConsumer[Array[Byte], Array[Byte]], topics: List[String], count: Long, timeout: Long, config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong) {
var bytesRead = 0L
var messagesRead = 0L
var lastBytesRead = 0L
var lastMessagesRead = 0L

// Wait for group join, metadata fetch, etc
val joinTimeout = 10000
val isAssigned = new AtomicBoolean(false)
consumer.subscribe(topics.asJava, new ConsumerRebalanceListener {
def onPartitionsAssigned(partitions: util.Collection[TopicPartition]) {
isAssigned.set(true)
}
def onPartitionsRevoked(partitions: util.Collection[TopicPartition]) {
isAssigned.set(false)
}})
val joinStart = System.currentTimeMillis()
while (!isAssigned.get()) {
if (System.currentTimeMillis() - joinStart >= joinTimeout) {
throw new Exception("Timed out waiting for initial group join.")
}
consumer.poll(100)
}
consumer.seekToBeginning(Collections.emptyList())

// Now start the benchmark
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastConsumedTime = System.currentTimeMillis
var currentTimeMillis = lastConsumedTime

while (messagesRead < count && currentTimeMillis - lastConsumedTime <= timeout) {
val records = consumer.poll(100).asScala
currentTimeMillis = System.currentTimeMillis
if (records.nonEmpty)
lastConsumedTime = currentTimeMillis
for (record <- records) {
messagesRead += 1
if (record.key != null)
bytesRead += record.key.size
if (record.value != null)
bytesRead += record.value.size

if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
if (config.showDetailedStats)
printProgressMessage(0, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat)
lastReportTime = currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
}
}
}

totalMessagesRead.set(messagesRead)
totalBytesRead.set(bytesRead)
}

def printProgressMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
startMs: Long, endMs: Long, dateFormat: SimpleDateFormat) = {
val elapsedMs: Double = endMs - startMs
val totalMBRead = (bytesRead * 1.0) / (1024 * 1024)
val mbRead = ((bytesRead - lastBytesRead) * 1.0) / (1024 * 1024)
println("%s, %d, %.4f, %.4f, %d, %.4f".format(dateFormat.format(endMs), id, totalMBRead,
1000.0 * (mbRead / elapsedMs), messagesRead, ((messagesRead - lastMessagesRead) / elapsedMs) * 1000.0))
}

class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over. This option is only used with the old consumer.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val bootstrapServersOpt = parser.accepts("broker-list", "REQUIRED (unless old consumer is used): A broker list to use for connecting if using the new consumer.")
.withRequiredArg()
.describedAs("host")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg
.describedAs("gid")
.defaultsTo("perf-consumer-" + new Random().nextInt(100000))
.ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024 * 1024)
val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.")
val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2 * 1024 * 1024)
val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10)
val numFetchersOpt = parser.accepts("num-fetch-threads", "Number of fetcher threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val newConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation. This is the default.")
val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.")
.withRequiredArg
.describedAs("config file")
.ofType(classOf[String])

val options = parser.parse(args: _*)

CommandLineUtils.checkRequiredArgs(parser, options, topicOpt, numMessagesOpt)

val useOldConsumer = options.has(zkConnectOpt)

val props = if (options.has(consumerConfigOpt))
Utils.loadProps(options.valueOf(consumerConfigOpt))
else
new Properties
if (!useOldConsumer) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServersOpt)
import org.apache.kafka.clients.consumer.ConsumerConfig
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, options.valueOf(bootstrapServersOpt))
props.put(ConsumerConfig.GROUP_ID_CONFIG, options.valueOf(groupIdOpt))
props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, options.valueOf(socketBufferSizeOpt).toString)
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, options.valueOf(fetchSizeOpt).toString)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (options.has(resetBeginningOffsetOpt)) "latest" else "earliest")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, classOf[ByteArrayDeserializer])
props.put(ConsumerConfig.CHECK_CRCS_CONFIG, "false")
} else {
if (options.has(bootstrapServersOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServersOpt is not valid with $zkConnectOpt.")
else if (options.has(newConsumerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $newConsumerOpt is not valid with $zkConnectOpt.")
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt, numMessagesOpt)
props.put("group.id", options.valueOf(groupIdOpt))
props.put("socket.receive.buffer.bytes", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.message.max.bytes", options.valueOf(fetchSizeOpt).toString)
props.put("auto.offset.reset", if (options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zookeeper.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "1000")
props.put("num.consumer.fetchers", options.valueOf(numFetchersOpt).toString)
}
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
val numMessages = options.valueOf(numMessagesOpt).longValue
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
if (reportingInterval <= 0)
throw new IllegalArgumentException("Reporting interval must be greater than 0.")
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
}

class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Array[Byte], Array[Byte]],
config: ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong, consumerTimeout: AtomicBoolean)
extends Thread(name) {

override def run() {
var bytesRead = 0L
var messagesRead = 0L
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L

try {
val iter = stream.iterator
while (iter.hasNext && messagesRead < config.numMessages) {
val messageAndMetadata = iter.next
messagesRead += 1
bytesRead += messageAndMetadata.message.length
val currentTimeMillis = System.currentTimeMillis

if (currentTimeMillis - lastReportTime >= config.reportingInterval) {
if (config.showDetailedStats)
printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, currentTimeMillis, config.dateFormat)
lastReportTime = currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
}
}
} catch {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case _: ConsumerTimeoutException =>
consumerTimeout.set(true)
case e: Throwable => e.printStackTrace()
}
totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead)
if (config.showDetailedStats)
printProgressMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis, config.dateFormat)
}

}
}
Loading

0 comments on commit 810d785

Please sign in to comment.