Skip to content
This repository has been archived by the owner on May 27, 2020. It is now read-only.

Commit

Permalink
Merge pull request #147 from Stratio/fix/DECISION-264_Harware_Emulator
Browse files Browse the repository at this point in the history
[DECISION-264] Changed Kafka Producer for producing avro messages
  • Loading branch information
josepablofernandez committed Mar 28, 2016
2 parents 0fec6f5 + a331aee commit 8120792
Showing 1 changed file with 14 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serializer;

public class HardwareEmulatorMain {
Expand All @@ -49,11 +51,11 @@ public class HardwareEmulatorMain {
public static void main(String[] args) {

int multiplicator = 1;
Producer<String, byte[]> avroProducer = null;
KafkaProducer<String, byte[]> avroProducer = null;

if (args != null && args.length > 1) {
multiplicator = Integer.valueOf(args[0]);
avroProducer = new Producer<>(createProducerConfig(args[1]));
avroProducer = new KafkaProducer<String, byte[]>(createKafkaProducerConfig(args[1]));
} else {
throw new RuntimeException(
"Usage: \n param 1 - data multiplier (default = 1) \n param 2 - kafka broker list");
Expand Down Expand Up @@ -111,26 +113,28 @@ public static void main(String[] args) {
es.shutdown();
}

private static ProducerConfig createProducerConfig(String brokerList) {
private static Properties createKafkaProducerConfig(String brokerList) {
Properties properties = new Properties();

properties.put("metadata.broker.list", brokerList);

properties.put(org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

properties.put(org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.ByteArraySerializer");
properties.put(org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");

return new ProducerConfig(properties);
return properties;

}

private static class DataSender implements Runnable {

private final Producer<String, byte[]> avroProducer;
private final KafkaProducer<String, byte[]> avroProducer;
private final List<Double> values;
private final String name;

public DataSender(Producer<String, byte[]> avroProducer, List<Double> values, String name) {
public DataSender(KafkaProducer<String, byte[]> avroProducer, List<Double> values, String name) {
super();
this.avroProducer = avroProducer;
this.values = values;
Expand All @@ -142,10 +146,10 @@ public void run() {

for (StratioStreamingMessage message : generateStratioStreamingMessages(values, name)) {

KeyedMessage<String, byte[]> busMessage = new KeyedMessage<>(
InternalTopic.TOPIC_DATA.getTopicName(), serializeStratioStreamingMessage(message));

ProducerRecord busMessage = new ProducerRecord(InternalTopic.TOPIC_DATA.getTopicName(), message.getOperation(),
serializeStratioStreamingMessage(message));
avroProducer.send(busMessage);

}

}
Expand Down

0 comments on commit 8120792

Please sign in to comment.