diff --git a/examples/addressbook/addressbook.gpb b/examples/addressbook/addressbook.gpb
new file mode 100644
index 0000000..4476576
Binary files /dev/null and b/examples/addressbook/addressbook.gpb differ
diff --git a/examples/addressbook/decode_addressbook.ktr b/examples/addressbook/decode_addressbook.ktr
index 0ccda47..4c9fbcc 100644
--- a/examples/addressbook/decode_addressbook.ktr
+++ b/examples/addressbook/decode_addressbook.ktr
@@ -6,333 +6,33 @@
Normal
- /
+ /
-
-
-
-
-
-
-
-
- ID_BATCH
- Y
- ID_BATCH
-
-
- CHANNEL_ID
- Y
- CHANNEL_ID
-
-
- TRANSNAME
- Y
- TRANSNAME
-
-
- STATUS
- Y
- STATUS
-
-
- LINES_READ
- Y
- LINES_READ
-
-
-
- LINES_WRITTEN
- Y
- LINES_WRITTEN
-
-
-
- LINES_UPDATED
- Y
- LINES_UPDATED
-
-
-
- LINES_INPUT
- Y
- LINES_INPUT
-
-
-
- LINES_OUTPUT
- Y
- LINES_OUTPUT
-
-
-
- LINES_REJECTED
- Y
- LINES_REJECTED
-
-
-
- ERRORS
- Y
- ERRORS
-
-
- STARTDATE
- Y
- STARTDATE
-
-
- ENDDATE
- Y
- ENDDATE
-
-
- LOGDATE
- Y
- LOGDATE
-
-
- DEPDATE
- Y
- DEPDATE
-
-
- REPLAYDATE
- Y
- REPLAYDATE
-
-
- LOG_FIELD
- Y
- LOG_FIELD
-
-
-
-
-
-
-
-
-
- ID_BATCH
- Y
- ID_BATCH
-
-
- SEQ_NR
- Y
- SEQ_NR
-
-
- LOGDATE
- Y
- LOGDATE
-
-
- TRANSNAME
- Y
- TRANSNAME
-
-
- STEPNAME
- Y
- STEPNAME
-
-
- STEP_COPY
- Y
- STEP_COPY
-
-
- LINES_READ
- Y
- LINES_READ
-
-
- LINES_WRITTEN
- Y
- LINES_WRITTEN
-
-
- LINES_UPDATED
- Y
- LINES_UPDATED
-
-
- LINES_INPUT
- Y
- LINES_INPUT
-
-
- LINES_OUTPUT
- Y
- LINES_OUTPUT
-
-
- LINES_REJECTED
- Y
- LINES_REJECTED
-
-
- ERRORS
- Y
- ERRORS
-
-
- INPUT_BUFFER_ROWS
- Y
- INPUT_BUFFER_ROWS
-
-
- OUTPUT_BUFFER_ROWS
- Y
- OUTPUT_BUFFER_ROWS
-
-
-
-
-
-
-
-
- ID_BATCH
- Y
- ID_BATCH
-
-
- CHANNEL_ID
- Y
- CHANNEL_ID
-
-
- LOG_DATE
- Y
- LOG_DATE
-
-
- LOGGING_OBJECT_TYPE
- Y
- LOGGING_OBJECT_TYPE
-
-
- OBJECT_NAME
- Y
- OBJECT_NAME
-
-
- OBJECT_COPY
- Y
- OBJECT_COPY
-
-
- REPOSITORY_DIRECTORY
- Y
- REPOSITORY_DIRECTORY
-
-
- FILENAME
- Y
- FILENAME
-
-
- OBJECT_ID
- Y
- OBJECT_ID
-
-
- OBJECT_REVISION
- Y
- OBJECT_REVISION
-
-
- PARENT_CHANNEL_ID
- Y
- PARENT_CHANNEL_ID
-
-
- ROOT_CHANNEL_ID
- Y
- ROOT_CHANNEL_ID
-
-
-
-
-
-
-
-
- ID_BATCH
- Y
- ID_BATCH
-
-
- CHANNEL_ID
- Y
- CHANNEL_ID
-
-
- LOG_DATE
- Y
- LOG_DATE
-
-
- TRANSNAME
- Y
- TRANSNAME
-
-
- STEPNAME
- Y
- STEPNAME
-
-
- STEP_COPY
- Y
- STEP_COPY
-
-
- LINES_READ
- Y
- LINES_READ
-
-
- LINES_WRITTEN
- Y
- LINES_WRITTEN
-
-
- LINES_UPDATED
- Y
- LINES_UPDATED
-
-
- LINES_INPUT
- Y
- LINES_INPUT
-
-
- LINES_OUTPUT
- Y
- LINES_OUTPUT
-
-
- LINES_REJECTED
- Y
- LINES_REJECTED
-
-
- ERRORS
- Y
- ERRORS
-
-
- LOG_FIELD
- N
- LOG_FIELD
-
-
+
+
+
+
+
+
+ID_BATCHYID_BATCHCHANNEL_IDYCHANNEL_IDTRANSNAMEYTRANSNAMESTATUSYSTATUSLINES_READYLINES_READLINES_WRITTENYLINES_WRITTENLINES_UPDATEDYLINES_UPDATEDLINES_INPUTYLINES_INPUTLINES_OUTPUTYLINES_OUTPUTLINES_REJECTEDYLINES_REJECTEDERRORSYERRORSSTARTDATEYSTARTDATEENDDATEYENDDATELOGDATEYLOGDATEDEPDATEYDEPDATEREPLAYDATEYREPLAYDATELOG_FIELDYLOG_FIELD
+
+
+
+
+
+ID_BATCHYID_BATCHSEQ_NRYSEQ_NRLOGDATEYLOGDATETRANSNAMEYTRANSNAMESTEPNAMEYSTEPNAMESTEP_COPYYSTEP_COPYLINES_READYLINES_READLINES_WRITTENYLINES_WRITTENLINES_UPDATEDYLINES_UPDATEDLINES_INPUTYLINES_INPUTLINES_OUTPUTYLINES_OUTPUTLINES_REJECTEDYLINES_REJECTEDERRORSYERRORSINPUT_BUFFER_ROWSYINPUT_BUFFER_ROWSOUTPUT_BUFFER_ROWSYOUTPUT_BUFFER_ROWS
+
+
+
+
+ID_BATCHYID_BATCHCHANNEL_IDYCHANNEL_IDLOG_DATEYLOG_DATELOGGING_OBJECT_TYPEYLOGGING_OBJECT_TYPEOBJECT_NAMEYOBJECT_NAMEOBJECT_COPYYOBJECT_COPYREPOSITORY_DIRECTORYYREPOSITORY_DIRECTORYFILENAMEYFILENAMEOBJECT_IDYOBJECT_IDOBJECT_REVISIONYOBJECT_REVISIONPARENT_CHANNEL_IDYPARENT_CHANNEL_IDROOT_CHANNEL_IDYROOT_CHANNEL_ID
+
+
+
+
+ID_BATCHYID_BATCHCHANNEL_IDYCHANNEL_IDLOG_DATEYLOG_DATETRANSNAMEYTRANSNAMESTEPNAMEYSTEPNAMESTEP_COPYYSTEP_COPYLINES_READYLINES_READLINES_WRITTENYLINES_WRITTENLINES_UPDATEDYLINES_UPDATEDLINES_INPUTYLINES_INPUTLINES_OUTPUTYLINES_OUTPUTLINES_REJECTEDYLINES_REJECTEDERRORSYERRORSLOG_FIELDNLOG_FIELD
@@ -360,24 +60,92 @@
- -
- 2015/03/29 09:47:39.641
- -
- 2015/03/29 09:47:39.641
+ -
+ 2015/03/29 09:47:39.641
+ -
+ 2015/03/29 09:47:39.641
+
+ AgileBI
+ localhost
+ MONETDB
+ Native
+ pentaho-instaview
+ 50000
+ monetdb
+ Encrypted 2be98afc86aa7f2e4cb14a17edb86abd8
+
+
+
+
+ EXTRA_OPTION_INFOBRIGHT.characterEncoding
UTF-8
+ EXTRA_OPTION_MYSQL.defaultFetchSize
500
+ EXTRA_OPTION_MYSQL.useCursorFetch
true
+ PORT_NUMBER
50000
+
+
+
+ FMDB Database
+ ${FMDB_HOST}
+ MYSQL
+ Native
+ ${FMDB_DB_NAME}
+ ${FMDB_PORT}
+ ${FMDB_USER}
+ ${FMDB_PASSWORD}
+
+
+
+
+ EXTRA_OPTION_MYSQL.connectTimeout
3000
+ EXTRA_OPTION_MYSQL.socketTimeout
120000
+ FORCE_IDENTIFIERS_TO_LOWERCASE
N
+ FORCE_IDENTIFIERS_TO_UPPERCASE
N
+ IS_CLUSTERED
N
+ PORT_NUMBER
${FMDB_PORT}
+ QUOTE_ALL_FIELDS
N
+ STREAM_RESULTS
N
+ SUPPORTS_BOOLEAN_DATA_TYPE
N
+ USE_POOLING
N
+
+
+
+ Vertica
+ michael-sci-dev
+ VERTICA5
+ Native
+ aa1
+ 5433
+ dbadmin
+ Encrypted 2be98afc86aa7f2e4cb1dac71da9fa6d4
+
+
+
+
+ EXTRA_OPTION_VERTICA5.MaxPooledConnectionsPerNode
20
+ FORCE_IDENTIFIERS_TO_LOWERCASE
N
+ FORCE_IDENTIFIERS_TO_UPPERCASE
N
+ INITIAL_POOL_SIZE
2
+ IS_CLUSTERED
N
+ MAXIMUM_POOL_SIZE
15
+ POOLING_minEvictableIdleTimeMillis
1000
+ POOLING_testOnBorrow
true
+ POOLING_testOnReturn
true
+ POOLING_testWhileIdle
true
+ POOLING_timeBetweenEvictionRunsMillis
1000
+ POOLING_validationQuery
SELECT 1 FROM DUAL
+ PORT_NUMBER
5433
+ QUOTE_ALL_FIELDS
N
+ SUPPORTS_BOOLEAN_DATA_TYPE
Y
+ USE_POOLING
Y
+
+
-
- Generate Rows
- Select values
- Y
-
-
- Select values
- Protocol Buffers Decode
- Y
-
+ Read GPB fileProtocol Buffers DecodeY
+ Generate RowsGet VariablesY
+ Get VariablesRead GPB fileY
Generate Rows
@@ -385,40 +153,68 @@
Y
1
-
- none
-
-
+
+ none
+
+
1
-
-
-
-
-
-
- 200
+
+
+ 180
220
Y
-
-
+
+
+
+
+ Get Variables
+ GetVariable
+
+ Y
+ 1
+
+ none
+
+
+
+
+ filename
+ ${Internal.Transformation.Filename.Directory}/addressbook.gpb
+ String
+
+
+
+
+ -1
+ -1
+ none
+
+
+
+
+ 340
+ 220
+ Y
+
+
+
Protocol Buffers Decode
ProtobufDecode
Y
1
-
- none
-
-
-
+
+ none
+
+
+ addressbook
- file
- ///home/michael/Dev/projects/pentaho-protobuf-decode/examples/addressbook/java/target/addressbook.jar
+ ${Internal.Transformation.Filename.Directory}/java/target/addressbook.jar
- tutorial.Addressbook$AddressBook
+ tutorial.Addressbook$AddressBook
id
@@ -441,50 +237,46 @@
String
-
-
-
-
-
-
- 520
+
+
+ 720
220
Y
-
-
+
+
+
- Select values
- SelectValues
+ Read GPB file
+ ScriptValueMod
Y
1
-
- none
-
-
-
-
- address
-
- -2
- -2
-
- N
-
-
-
-
-
-
-
- 340
+
+ none
+
+
+ N
+ 9
+ 0
+ Script 1
+ file = new Packages.java.io.File(filename.toString().substr(5));
fileInputStream = new Packages.java.io.FileInputStream(file);
var addressbook = Packages.org.pentaho.di.core.Const.createByteArray(file.length());
fileInputStream.read(addressbook, 0, file.length());
fileInputStream.close();
+ addressbook
+ addressbook
+ Binary
+ -1
+ -1
+ N
+
+
+ 520
220
Y
-
-
+
+
+
-
+
- N
+ N
diff --git a/examples/addressbook/java/.classpath b/examples/addressbook/java/.classpath
new file mode 100644
index 0000000..f061ab8
--- /dev/null
+++ b/examples/addressbook/java/.classpath
@@ -0,0 +1,7 @@
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/addressbook/java/.project b/examples/addressbook/java/.project
new file mode 100644
index 0000000..c86d4f3
--- /dev/null
+++ b/examples/addressbook/java/.project
@@ -0,0 +1,14 @@
+
+
+ protobuf-addressbook-example
+ NO_M2ECLIPSE_SUPPORT: Project files created with the maven-eclipse-plugin are not supported in M2Eclipse.
+
+
+
+ org.eclipse.jdt.core.javabuilder
+
+
+
+ org.eclipse.jdt.core.javanature
+
+
\ No newline at end of file
diff --git a/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeDialog.java b/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeDialog.java
index 996834e..8f1c5d2 100644
--- a/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeDialog.java
+++ b/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeDialog.java
@@ -293,8 +293,7 @@ public void widgetSelected(SelectionEvent e) {
FileDialog dialog = new FileDialog(shell, SWT.OPEN);
dialog.setFilterExtensions(new String[] { "*.jar", "*" });
if (wClasspath.getText() != null) {
- String fname = transMeta.environmentSubstitute(wClasspath
- .getText());
+ String fname = wClasspath.getText(); // transMeta.environmentSubstitute(wClasspath.getText());
dialog.setFileName(fname);
}
@@ -382,10 +381,9 @@ private void cancel() {
* Copy information from the dialog fields to the meta-data input
*/
private void setData(ProtobufDecodeMeta consumerMeta) {
- consumerMeta.setInputField(transMeta.environmentSubstitute(wInputField
- .getText()));
- consumerMeta.setClasspath(transMeta.environmentSubstitute(
- wClasspath.getText().trim()).split(File.pathSeparator));
+ consumerMeta.setInputField(wInputField.getText());
+ consumerMeta.setClasspath(wClasspath.getText().trim()
+ .split(File.pathSeparator));
consumerMeta.setRootClass(wRootClass.getText());
int nrNonEmptyFields = wFields.nrNonEmpty();
@@ -417,28 +415,33 @@ private void ok() {
private void detectFields() {
try {
- ProtobufDecoder protobufDecoder = new ProtobufDecoder(transMeta
- .environmentSubstitute(wClasspath.getText().trim()).split(
- File.pathSeparator), wRootClass.getText(), null);
- Map> fields = protobufDecoder.guessFields();
- RowMeta rowMeta = new RowMeta();
- for (Entry> e : fields.entrySet()) {
- String fieldPath = e.getKey();
- int i = fieldPath.lastIndexOf('.');
- String fieldName = i != -1 ? fieldPath.substring(i + 1)
- : fieldPath;
- rowMeta.addValueMeta(new FieldMeta(fieldName, fieldPath,
- KettleTypesConverter.javaToKettleType(e.getValue())));
+ ProtobufDecoder protobufDecoder = new ProtobufDecoder(
+ transMeta.environmentSubstitute(wClasspath.getText().trim()
+ .split(File.pathSeparator)), wRootClass.getText(),
+ null);
+ try {
+ Map> fields = protobufDecoder.guessFields();
+ RowMeta rowMeta = new RowMeta();
+ for (Entry> e : fields.entrySet()) {
+ String fieldPath = e.getKey();
+ int i = fieldPath.lastIndexOf('.');
+ String fieldName = i != -1 ? fieldPath.substring(i + 1)
+ : fieldPath;
+ rowMeta.addValueMeta(new FieldMeta(fieldName, fieldPath,
+ KettleTypesConverter.javaToKettleType(e.getValue())));
+ }
+ BaseStepDialog.getFieldsFromPrevious(rowMeta, wFields, 1,
+ new int[] { 1 }, new int[] { 3 }, -1, -1,
+ new TableItemInsertListener() {
+ public boolean tableItemInserted(
+ TableItem tableItem, ValueMetaInterface v) {
+ tableItem.setText(2, ((FieldMeta) v).path);
+ return true;
+ }
+ });
+ } finally {
+ protobufDecoder.dispose();
}
- BaseStepDialog.getFieldsFromPrevious(rowMeta, wFields, 1,
- new int[] { 1 }, new int[] { 3 }, -1, -1,
- new TableItemInsertListener() {
- public boolean tableItemInserted(TableItem tableItem,
- ValueMetaInterface v) {
- tableItem.setText(2, ((FieldMeta) v).path);
- return true;
- }
- });
} catch (ProtobufDecoderException e) {
new ErrorDialog(
shell,
diff --git a/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeStep.java b/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeStep.java
index 242a40d..cc74fd7 100644
--- a/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeStep.java
+++ b/src/main/java/com/ruckuswireless/pentaho/protobuf/decode/ProtobufDecodeStep.java
@@ -5,7 +5,6 @@
import org.pentaho.di.core.Const;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.row.RowDataUtil;
-import org.pentaho.di.core.row.RowMetaInterface;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.BaseStep;
@@ -23,8 +22,9 @@
*/
public class ProtobufDecodeStep extends BaseStep implements StepInterface {
- public ProtobufDecodeStep(StepMeta stepMeta, StepDataInterface stepDataInterface, int copyNr, TransMeta transMeta,
- Trans trans) {
+ public ProtobufDecodeStep(StepMeta stepMeta,
+ StepDataInterface stepDataInterface, int copyNr,
+ TransMeta transMeta, Trans trans) {
super(stepMeta, stepDataInterface, copyNr, transMeta, trans);
}
@@ -34,15 +34,34 @@ public boolean init(StepMetaInterface smi, StepDataInterface sdi) {
ProtobufDecodeMeta meta = (ProtobufDecodeMeta) smi;
ProtobufDecodeData data = (ProtobufDecodeData) sdi;
try {
- data.decoder = new ProtobufDecoder(meta.getClasspath(), meta.getRootClass(), meta.getFields());
+ data.decoder = new ProtobufDecoder(
+ environmentSubstitute(meta.getClasspath()),
+ meta.getRootClass(), meta.getFields());
} catch (ProtobufDecoderException e) {
- logError(Messages.getString("ProtobufDecodeStep.Init.Error", getStepname()), e);
+ logError(Messages.getString("ProtobufDecodeStep.Dispose.Error",
+ getStepname()), e);
return false;
}
return true;
}
- public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws KettleException {
+ public void dispose(StepMetaInterface smi, StepDataInterface sdi) {
+
+ ProtobufDecodeData data = (ProtobufDecodeData) sdi;
+ if (data.decoder != null) {
+ try {
+ data.decoder.dispose();
+ } catch (ProtobufDecoderException e) {
+ logError(Messages.getString("ProtobufDecodeStep.Init.Error",
+ getStepname()), e);
+ }
+ data.decoder = null;
+ }
+ super.dispose(smi, sdi);
+ }
+
+ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi)
+ throws KettleException {
Object[] r = getRow();
if (r == null) {
setOutputDone();
@@ -52,27 +71,28 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
ProtobufDecodeMeta meta = (ProtobufDecodeMeta) smi;
ProtobufDecodeData data = (ProtobufDecodeData) sdi;
- RowMetaInterface inputRowMeta = getInputRowMeta();
-
if (first) {
first = false;
- data.outputRowMeta = inputRowMeta.clone();
+ data.outputRowMeta = getInputRowMeta().clone();
meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
String inputField = environmentSubstitute(meta.getInputField());
int numErrors = 0;
if (Const.isEmpty(inputField)) {
- logError(Messages.getString("ProtobufDecodeStep.Log.FieldNameIsNull")); //$NON-NLS-1$
+ logError(Messages
+ .getString("ProtobufDecodeStep.Log.FieldNameIsNull")); //$NON-NLS-1$
numErrors++;
}
- data.inputFieldNr = inputRowMeta.indexOfValue(inputField);
+ data.inputFieldNr = getInputRowMeta().indexOfValue(inputField);
if (data.inputFieldNr < 0) {
- logError(Messages.getString("ProtobufDecodeStep.Log.CouldntFindField", inputField)); //$NON-NLS-1$
+ logError(Messages.getString(
+ "ProtobufDecodeStep.Log.CouldntFindField", inputField)); //$NON-NLS-1$
numErrors++;
}
- if (!inputRowMeta.getValueMeta(data.inputFieldNr).isBinary()) {
- logError(Messages.getString("ProtobufDecodeStep.Log.FieldNotValid", inputField)); //$NON-NLS-1$
+ if (!getInputRowMeta().getValueMeta(data.inputFieldNr).isBinary()) {
+ logError(Messages.getString(
+ "ProtobufDecodeStep.Log.FieldNotValid", inputField)); //$NON-NLS-1$
numErrors++;
}
if (numErrors > 0) {
@@ -80,19 +100,23 @@ public boolean processRow(StepMetaInterface smi, StepDataInterface sdi) throws K
stopAll();
return false;
}
- data.inputFieldMeta = inputRowMeta.getValueMeta(data.inputFieldNr);
+ data.inputFieldMeta = getInputRowMeta().getValueMeta(
+ data.inputFieldNr);
}
try {
- byte[] message = data.inputFieldMeta.getBinary(r[data.inputFieldNr]);
+ byte[] message = data.inputFieldMeta
+ .getBinary(r[data.inputFieldNr]);
try {
List