Skip to content

Commit

Permalink
Lazy Kafka producer initialization
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Spector committed Aug 11, 2014
1 parent 2c08866 commit 5169867
Showing 1 changed file with 42 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,38 +25,23 @@
*/
public class KafkaProducerStep extends BaseStep implements StepInterface {

public KafkaProducerStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
Trans trans) {
public KafkaProducerStep(StepMeta stepMeta,
StepDataInterface stepDataInterface, int copyNr,
TransMeta transMeta, Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}

public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
super.init(smi, sdi);

KafkaProducerMeta meta = (KafkaProducerMeta) smi;
KafkaProducerData data = (KafkaProducerData) sdi;

Properties properties = meta.getKafkaProperties();
Properties substProperties = new Properties();
for (Entry<Object, Object> e : properties.entrySet()) {
substProperties.put(e.getKey(), environmentSubstitute(e.getValue().toString()));
}

ProducerConfig producerConfig = new ProducerConfig(substProperties);
logBasic(Messages.getString("KafkaProducerStep.CreateKafkaProducer.Message", producerConfig.brokerList()));
data.producer = new Producer<Object, Object>(producerConfig);
return true;
}

public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
KafkaProducerData data = (KafkaProducerData) sdi;
if (data.producer != null) {
data.producer.close();
data.producer = null;
}
super.dispose(smi, sdi);
}

public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {
Object[] r = getRow();
if (r == null) {
setOutputDone();
Expand All @@ -70,23 +55,44 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K

if (first) {
first = false;

// Initialize Kafka client:
if (data.producer == null) {
Properties properties = meta.getKafkaProperties();
Properties substProperties = new Properties();
for (Entry<Object, Object> e : properties.entrySet()) {
substProperties.put(e.getKey(), environmentSubstitute(e
.getValue().toString()));
}

ProducerConfig producerConfig = new ProducerConfig(
substProperties);
logBasic(Messages.getString(
"KafkaProducerStep.CreateKafkaProducer.Message",
producerConfig.brokerList()));
data.producer = new Producer<Object, Object>(producerConfig);
}

data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);

String inputField = environmentSubstitute(meta.getField());

int numErrors = 0;
if (Const.isEmpty(inputField)) {
logError(Messages.getString("KafkaProducerStep.Log.FieldNameIsNull")); //$NON-NLS-1$
logError(Messages
.getString("KafkaProducerStep.Log.FieldNameIsNull")); //$NON-NLS-1$
numErrors++;
}
data.inputFieldNr = inputRowMeta.indexOfValue(inputField);
if (data.inputFieldNr < 0) {
logError(Messages.getString("KafkaProducerStep.Log.CouldntFindField", inputField)); //$NON-NLS-1$
logError(Messages.getString(
"KafkaProducerStep.Log.CouldntFindField", inputField)); //$NON-NLS-1$
numErrors++;
}
if (!inputRowMeta.getValueMeta(data.inputFieldNr).isBinary()) {
logError(Messages.getString("KafkaProducerStep.Log.FieldNotValid", inputField)); //$NON-NLS-1$
logError(Messages.getString(
"KafkaProducerStep.Log.FieldNotValid", inputField)); //$NON-NLS-1$
numErrors++;
}
if (numErrors > 0) {
Expand All @@ -98,17 +104,21 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
}

try {
byte[] message = data.inputFieldMeta.getBinary(r[data.inputFieldNr]);
byte[] message = data.inputFieldMeta
.getBinary(r[data.inputFieldNr]);
String topic = environmentSubstitute(meta.getTopic());

data.producer.send(new KeyedMessage<Object, Object>(topic, message));

data.producer
.send(new KeyedMessage<Object, Object>(topic, message));
if (isRowLevel()) {
logRowlevel(Messages.getString("KafkaProducerStep.Log.SendingData", topic,
logRowlevel(Messages.getString(
"KafkaProducerStep.Log.SendingData", topic,
data.inputFieldMeta.getString(r[data.inputFieldNr])));
}
} catch (KettleException e) {
if (!getStepMeta().isDoingErrorHandling()) {
logError(Messages.getString("KafkaProducerStep.ErrorInStepRunning", e.getMessage()));
logError(Messages.getString(
"KafkaProducerStep.ErrorInStepRunning", e.getMessage()));
setErrors(1);
stopAll();
setOutputDone();
Expand All @@ -119,10 +129,12 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
return true;
}

public void stopRunning(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
public void stopRunning(StepMetaInterface smi, StepDataInterface sdi)
throws KettleException {

KafkaProducerData data = (KafkaProducerData) sdi;
data.producer.close();
data.producer = null;

super.stopRunning(smi, sdi);
}
Expand Down

0 comments on commit 5169867

Please sign in to comment.