Skip to content

Commit

Permalink
Allow providing custom Kafka properties
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Spector committed Jan 5, 2017
1 parent c2709cf commit 10fe128
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 14 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.ruckuswireless.pentaho.kafka.producer;

import java.util.Arrays;
import java.util.Properties;
import java.util.TreeSet;

import org.eclipse.swt.SWT;
import org.eclipse.swt.custom.CCombo;
Expand Down Expand Up @@ -256,11 +258,15 @@ private void getData(KafkaProducerMeta producerMeta, boolean copyStepname) {
wMessageField.setText(Const.NVL(producerMeta.getMessageField(), ""));
wKeyField.setText(Const.NVL(producerMeta.getKeyField(), ""));

TreeSet<String> propNames = new TreeSet<String>();
propNames.addAll(Arrays.asList(KafkaProducerMeta.KAFKA_PROPERTIES_NAMES));
propNames.addAll(producerMeta.getKafkaProperties().stringPropertyNames());

Properties kafkaProperties = producerMeta.getKafkaProperties();
for (int i = 0; i < KafkaProducerMeta.KAFKA_PROPERTIES_NAMES.length; ++i) {
String propName = KafkaProducerMeta.KAFKA_PROPERTIES_NAMES[i];
int i = 0;
for (String propName : propNames) {
String value = kafkaProperties.getProperty(propName);
TableItem item = new TableItem(wProps.table, i > 1 ? SWT.BOLD : SWT.NONE);
TableItem item = new TableItem(wProps.table, i++ > 1 ? SWT.BOLD : SWT.NONE);
int colnr = 1;
item.setText(colnr++, Const.NVL(propName, ""));
String defaultValue = KafkaProducerMeta.KAFKA_PROPERTIES_DEFAULTS.get(propName);
Expand All @@ -269,6 +275,7 @@ private void getData(KafkaProducerMeta producerMeta, boolean copyStepname) {
}
item.setText(colnr++, Const.NVL(value, defaultValue));
}

wProps.removeEmptyRows();
wProps.setRowNums();
wProps.optWidth(true);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.ruckuswireless.pentaho.kafka.producer;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -138,10 +140,13 @@ public void loadXML(Node stepnode, List<DatabaseMeta> databases, Map<String, Cou
messageField = XMLHandler.getTagValue(stepnode, "FIELD");
keyField = XMLHandler.getTagValue(stepnode, "KEYFIELD");
Node kafkaNode = XMLHandler.getSubNode(stepnode, "KAFKA");
for (String name : KAFKA_PROPERTIES_NAMES) {
String value = XMLHandler.getTagValue(kafkaNode, name);
if (value != null) {
kafkaProperties.put(name, value);
String[] kafkaElements = XMLHandler.getNodeElements(kafkaNode);
if (kafkaElements != null) {
for (String propName : kafkaElements) {
String value = XMLHandler.getTagValue(kafkaNode, propName);
if (value != null) {
kafkaProperties.put(propName, value);
}
}
}
} catch (Exception e) {
Expand All @@ -161,7 +166,7 @@ public String getXML() throws KettleException {
retval.append(" ").append(XMLHandler.addTagValue("KEYFIELD", keyField));
}
retval.append(" ").append(XMLHandler.openTag("KAFKA")).append(Const.CR);
for (String name : KAFKA_PROPERTIES_NAMES) {
for (String name : kafkaProperties.stringPropertyNames()) {
String value = kafkaProperties.getProperty(name);
if (value != null) {
retval.append(" " + XMLHandler.addTagValue(name, value));
Expand All @@ -177,6 +182,11 @@ public void readRep(Repository rep, ObjectId stepId, List<DatabaseMeta> database
topic = rep.getStepAttributeString(stepId, "TOPIC");
messageField = rep.getStepAttributeString(stepId, "FIELD");
keyField = rep.getStepAttributeString(stepId, "KEYFIELD");
String kafkaPropsXML = rep.getStepAttributeString(stepId, "KAFKA");
if (kafkaPropsXML != null) {
kafkaProperties.loadFromXML(new ByteArrayInputStream(kafkaPropsXML.getBytes()));
}
// Support old versions:
for (String name : KAFKA_PROPERTIES_NAMES) {
String value = rep.getStepAttributeString(stepId, name);
if (value != null) {
Expand All @@ -199,12 +209,9 @@ public void saveRep(Repository rep, ObjectId transformationId, ObjectId stepId)
if (keyField != null) {
rep.saveStepAttribute(transformationId, stepId, "KEYFIELD", keyField);
}
for (String name : KAFKA_PROPERTIES_NAMES) {
String value = kafkaProperties.getProperty(name);
if (value != null) {
rep.saveStepAttribute(transformationId, stepId, name, value);
}
}
ByteArrayOutputStream buf = new ByteArrayOutputStream();
kafkaProperties.storeToXML(buf, null);
rep.saveStepAttribute(transformationId, stepId, "KAFKA", buf.toString());
} catch (Exception e) {
throw new KettleException("KafkaProducerMeta.Exception.saveRep", e);
}
Expand Down

0 comments on commit 10fe128

Please sign in to comment.