Add to your pom.xml
<dependency>
<groupId>io.github.amerousful</groupId>
<artifactId>gatling-kafka</artifactId>
<version>3.1</version>
</dependency>
Add to your build.sbt
libraryDependencies += "io.github.amerousful" % "gatling-kafka" % "3.1"
Import:
import io.github.amerousful.kafka.Predef._
Protocol:
val kafkaProtocol = kafka
.broker(KafkaBroker("localhost", 9092))
.acks("1")
.producerIdenticalSerializer("org.apache.kafka.common.serialization.StringSerializer")
.consumerIdenticalDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
.replyTimeout(10 seconds)
.matchByKey()
Fire and forget:
val kafkaFireAndForget = kafka("Kafka: fire and forget")
.send
.topic("input_topic")
.payload(StringBody("#{payload}"))
.key("#{key}")
.headers(Map(
"header_1" -> "#{h_value_1}",
"header_2" -> "#{h_value_2}",
))
Request and reply:
val kafkaRequestWithReply = kafka("Kafka: request with reply")
.requestReply
.topic("input_topic")
.payload("""{ "m": "#{payload}" }""")
.replyTopic("output_topic")
.key("#{id} #{key}")
.check(jsonPath("$.m").is("#{payload}_1"))
Scenario:
scenario("Kafka Scenario")
.exec(kafkaFireAndForget)
Inject:
setUp(
scn.inject(
constantUsersPerSec(2) during(10 seconds)
).protocols(kafkaProtocol)
)
There are three types how to load Kafka:
- Just send a request into a Topic without any wait, Fire-and-forget
kafka("Kafka: fire and forget")
.send
...
- Send a request into an input Topic, and then wait an outcome message from an output Topic
kafka("Kafka: request with reply")
.requestReply
...
.replyTopic("output_topic")
- Only consume.
kafka("Kafka: Only consume")
.onlyConsume
.readTopic("#{output_topic}")
.payloadForTracking {
"payload"
}
.keyForTracking("key")
.startTime("#{currentTimeMillis()}")
Obviously, it doesn't send any message, but as it needs to match messages, there are methods similar as for request-reply
: payloadForTracking
, keyForTracking
and header(s)ForTracking
Another aspect to consider is the potential usefulness of tracking when a message is triggered and determining the elapsed time for its entire processing chain. To facilitate this, the method startTime
allows you to input a specific time value in milliseconds.
-- Send HTTP request
-- Write the start point into Session
-- Consume a message from Kafka
If startTime is not passed, it defaults to the current time.
In a case with request-reply you have to define in a protocol waiting time for the reply:
.replyTimeout(10 seconds)
Another thing that you have to provide it's how to match a message. There are several options:
matchByKey()
matchByValue()
- Custom matcher:
object CustomMatcher extends KafkaMatcher {
override def requestMatchId(msg: ProducerRecord[String, String]): String = ???
override def responseMatchId(msg: ConsumerRecord[String, String]): String = ???
}
...
.messageMatcher(CustomMatcher)
send ->
topic() ->
payload() ->
key() / headers()
requestReply ->
topic() ->
payload() ->
replyTopic() ->
key() / headers() / check() / protobufOutput()
onlyConsume ->
readTopic() ->
payloadForTracking() ->
keyForTracking() / headerForTracking() / check() / startTime() / protobufOutput()
- Static name:
.replyConsumerName("gatling-test-consumer")
- If you don't define a static name it will generate by pattern
gatling-test-${java.util.UUID.randomUUID()}
Add to your logback.xml
:
<logger name="io.github.amerousful.kafka" level="ALL"/>
Starting from version 3.0, support for Protobuf payloads has been introduced. Please note that all examples provided assume the usage of Scala with Maven as the setup. This functionality works with classes generated through ScalaPB.
Instruction:
- Create a
.proto
file.
src/
└── test/
└── resources/
└── protobuf/
└── service.proto
service.proto
syntax = "proto3";
package proto;
message AuthLocal {
string token = 1;
int32 id = 2;
string email = 3;
}
message Order {
string name = 1;
}
- Add to the
pom.xml
Protobuf class generator.
Important! You have to define output for both Java and Scala.
...
<build>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>com.github.os72</groupId>
<artifactId>protoc-jar-maven-plugin</artifactId>
<version>3.11.4</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<addProtoSources>all</addProtoSources>
<includeMavenTypes>transitive</includeMavenTypes>
<inputDirectories>
<include>src/test/</include>
</inputDirectories>
<outputTargets>
<outputTarget>
<type>java</type>
<addSources>test</addSources>
<outputDirectory>${project.basedir}/target/generated-sources/protobuf
</outputDirectory>
<pluginArtifact>com.thesamet.scalapb:protoc-gen-scala:0.11.15:sh:unix
</pluginArtifact>
</outputTarget>
<outputTarget>
<type>scalapb</type>
<addSources>test</addSources>
<outputDirectory>${project.basedir}/target/generated-sources/protobuf
</outputDirectory>
<outputOptions>java_conversions</outputOptions>
<pluginArtifact>com.thesamet.scalapb:protoc-gen-scala:0.11.15:sh:unix
</pluginArtifact>
</outputTarget>
</outputTargets>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
...
- Run in a console
mvn generate-sources
. - Generated classes will be in the path
/target/generated-sources/protobuf
. - Import the generated class for use in requests and scenarios
import proto.service.AuthLocal
(it's Scala class and object) - Pass Scala case classes as payloads It implicitly converts Scala to Java.
.payload(
// Scala case class with all fields
AuthLocal("myToken", 123, "[email protected]")
)
.payload { session =>
val token = session("token").as[String]
AuthLocal(token, 123, "[email protected]")
}
- Deserialization into Protobuf and working in the
check
with objects.
.protobufOutput(AuthLocal)
.check(
protobufResponse((auth: AuthLocal) => auth.id) is 123
)
- Protocol.
.consumerKeyDeserializer("org.apache.kafka.common.serialization.StringDeserializer")
.producerKeySerializer("org.apache.kafka.common.serialization.StringSerializer")
.producerValueSerializer("io.confluent.kafka.serializers.protobuf.KafkaProtobufSerializer")
.schemaUrl("http://localhost:8085")
As you see, there no needed to pass consumer Protobuf deserializer for value. Because it resolves by the protobufOutput
method.
Pull requests are welcome. For major changes, please open an issue first to discuss what you would like to change.