Skip to content

Commit

Permalink
Merge branch 'release/v3.0.0'
Browse files Browse the repository at this point in the history
Conflicts:
	com.ibm.streamsx.kafka/info.xml
  • Loading branch information
RolefH committed Nov 26, 2019
2 parents 982d9aa + 0eb43c6 commit 3db53ad
Show file tree
Hide file tree
Showing 99 changed files with 2,949 additions and 974 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ task archive(type: Tar) {
baseName = "com.ibm.streamsx.kafka"
version = toolkitVersion
doLast {
ant.checksum file: archivePath
ant.checksum file: archivePath, algorithm: 'SHA-1', fileext: '.sha1'
}
}

Expand Down
21 changes: 11 additions & 10 deletions com.ibm.streamsx.kafka/.classpath
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@
</attributes>
</classpathentry>
<classpathentry kind="src" path="impl/java/src"/>
<classpathentry path="impl/java/bin" kind="output"/>
<classpathentry path="bin" kind="output"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.8/"/>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.2.1/59902a774ef54815b49f2ba5f2e21a7bb44f1b80/kafka-clients-2.2.1-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.2.1/3a9480801401f5eafdfeefad6d65493a7c5a7b6d/kafka-clients-2.2.1.jar">
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.3.1/7bf757fc4e58be63a26c04fb5eb2d8c1f93ea353/kafka-clients-2.3.1-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.3.1/21664b0318d201ec412f9b02357b07ba94009c87/kafka-clients-2.3.1.jar">
<attributes>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.2.1/a230a86dd91669fc0a45bed35e0daba821b8ff16/kafka-clients-2.2.1-javadoc.jar!/"/>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.kafka/kafka-clients/2.3.1/2a0f31874b4543a1dca0d6375e663c9c5ff0950c/kafka-clients-2.3.1-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.commons/commons-lang3/3.5/f7d878153e86a1cdddf6b37850e00a9f8bff726f/commons-lang3-3.5-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.apache.commons/commons-lang3/3.5/6c6c702c89bfff3cd9e80b04d668c5e190d588c6/commons-lang3-3.5.jar">
Expand All @@ -29,24 +30,24 @@
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-log4j12/1.7.21/bd44b1f0c34da545f0495c05f0cf74e2f3231b14/slf4j-log4j12-1.7.21-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.3.8-1/760e39e10cbd12f47239ad6f6d6bc7c2a39a3d9c/zstd-jni-1.3.8-1-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.3.8-1/455bd7c5a8a68b1aebcace632b6ebcb4fd92d32a/zstd-jni-1.3.8-1.jar">
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.4.0-1/613644b5d81071cc72f52b984500034165a982a4/zstd-jni-1.4.0-1-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.4.0-1/8cc7f9b0f2bd53ef3bd0e38fccc678691f983474/zstd-jni-1.4.0-1.jar">
<attributes>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.3.8-1/4a1b0bcd562cc93cc2fc44f1f170f25e2857345b/zstd-jni-1.3.8-1-javadoc.jar!/"/>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/com.github.luben/zstd-jni/1.4.0-1/3584875946d9c4211a6b2642d9b68f0822693eea/zstd-jni-1.4.0-1-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.5.0/e38079cc05d15f0f023f58495b1f1d4533bedf8d/lz4-java-1.5.0-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.5.0/d36fb639f06aaa4f17307625f80e2e32f815672a/lz4-java-1.5.0.jar">
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.6.0/f70c1919da33149ca0dc1a3ab7bd99e332d7830d/lz4-java-1.6.0-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.6.0/b49e2b422a5b7145ba7aa0c3f60c13664a5c5acf/lz4-java-1.6.0.jar">
<attributes>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.5.0/2511fe1703ba3137d78dd068c60e69a3b87ad5a5/lz4-java-1.5.0-javadoc.jar!/"/>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.lz4/lz4-java/1.6.0/ef1f0a5746e3dcd3c1fe145094db9389e5c368c9/lz4-java-1.6.0-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.2/73b98fff2c1dcd337dbdac7182bf747b5aee2873/snappy-java-1.1.7.2-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.2/307b286efd119ad2c6d4291128bf110bddc68088/snappy-java-1.1.7.2.jar">
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.3/83002a7c6641e17c16ecfad3342fb8e22033b775/snappy-java-1.1.7.3-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.3/241bb74a1eb37d68a4e324a4bc3865427de0a62d/snappy-java-1.1.7.3.jar">
<attributes>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.2/2276814529c52cf4c7cceafe49725ef940f8b160/snappy-java-1.1.7.2-javadoc.jar!/"/>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.xerial.snappy/snappy-java/1.1.7.3/cfe26ce602b1b5ac81555f6a3e2f18aba60d6ae8/snappy-java-1.1.7.3-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/962153db4a9ea71b79d047dfd1b2a0d80d8f4739/slf4j-api-1.7.25-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/da76ca59f6a57ee3102f8f9bd9cee742973efa8a/slf4j-api-1.7.25.jar">
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.26/21c84cdf9da108216b5e402611d4af479b60cb8/slf4j-api-1.7.26-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.26/77100a62c2e6f04b53977b9f541044d7d722693d/slf4j-api-1.7.26.jar">
<attributes>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.25/f003e89ddebdbaca5cf3fc7cc0c67c88c2ca1a3c/slf4j-api-1.7.25-javadoc.jar!/"/>
<attribute name="javadoc_location" value="jar:file:/home/rolef/.gradle/caches/modules-2/files-2.1/org.slf4j/slf4j-api/1.7.26/83863db711c68b9d5542c5fa0cd801b7035281d7/slf4j-api-1.7.26-javadoc.jar!/"/>
</attributes>
</classpathentry>
<classpathentry sourcepath="/home/rolef/.gradle/caches/modules-2/files-2.1/log4j/log4j/1.2.17/677abe279b68c5e7490d6d50c6951376238d7d3e/log4j-1.2.17-sources.jar" kind="lib" path="/home/rolef/.gradle/caches/modules-2/files-2.1/log4j/log4j/1.2.17/5af35056b4d257e4b64b9e8069c0746e8b08629f/log4j-1.2.17.jar">
Expand Down
31 changes: 19 additions & 12 deletions com.ibm.streamsx.kafka/.project
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,33 @@
<projectDescription>
<name>com.ibm.streamsx.kafka</name>
<comment></comment>
<projects/>
<natures>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.eclipse.jdt.core.javabuilder</name>
<arguments/>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>org.eclipse.xtext.ui.shared.xtextBuilder</name>
<arguments/>
<arguments>
</arguments>
</buildCommand>
<buildCommand>
<name>com.ibm.streams.studio.splproject.builder.SPLProjectBuilder</name>
<arguments/>
<name>org.eclipse.ui.externaltools.ExternalToolBuilder</name>
<triggers>full,incremental,</triggers>
<arguments>
<dictionary>
<key>LaunchConfigHandle</key>
<value>&lt;project&gt;/.externalToolBuilders/com.ibm.streams.studio.splproject.builder.SPLProjectBuilder.launch</value>
</dictionary>
</arguments>
</buildCommand>
</buildSpec>
<linkedResources/>
<filteredResources/>
<natures>
<nature>org.eclipse.xtext.ui.shared.xtextNature</nature>
<nature>com.ibm.streams.studio.splproject.SPLProjectNature</nature>
<nature>org.eclipse.jdt.core.javanature</nature>
</natures>
</projectDescription>
4 changes: 2 additions & 2 deletions com.ibm.streamsx.kafka/.settings/org.eclipse.jdt.core.prefs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
#
#Thu Sep 14 06:32:00 EDT 2017
#Fri Aug 09 10:08:10 CEST 2019
org.eclipse.jdt.core.compiler.problem.enumIdentifier=error
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.8
eclipse.preferences.version=1
org.eclipse.jdt.core.compiler.source=1.8
org.eclipse.jdt.core.compiler.problem.assertIdentifier=error
org.eclipse.jdt.core.compiler.source=1.8
org.eclipse.jdt.core.compiler.processAnnotations=enabled
org.eclipse.jdt.core.compiler.compliance=1.8
5 changes: 3 additions & 2 deletions com.ibm.streamsx.kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ def STREAMS_INSTALL = System.getenv("STREAMS_INSTALL")

dependencies {
compile fileTree(dir: STREAMS_INSTALL + '/lib', include: ['com.ibm.streams.operator.jar'])
// compile group: 'org.apache.kafka', name: 'kafka-clients', version: '1.0.0'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.1'
// compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.2.1'
compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.1'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.5'
compile group: 'com.google.code.gson', name: 'gson', version: '2.8.1'
compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.21'
Expand All @@ -68,6 +68,7 @@ task deleteDeps(type: Delete) {
delete "opt"
delete "bin"
delete "output"
delete "impl/java/bin"
delete fileTree(dir : 'com.ibm.streamsx.kafka', exclude : ['*.spl'])
delete "com.ibm.streamsx.kafka.messagehub"
delete fileTree(dir : 'impl/lib/', include : ['com.ibm.streamsx.kafka*.jar'])
Expand Down
129 changes: 107 additions & 22 deletions com.ibm.streamsx.kafka/com.ibm.streamsx.kafka/Functions.spl
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,92 @@ rstring _createTopicPartitionOffsetObject (rstring topic, int32 partition) {
";
}

rstring _createTopicObject (rstring topic) {
return "
{
\"topic\" : \"" + topic + "\"
}
";
}

/**
* Creates the JSON message to add a single topic to the operator's subscription to begin consuming from the default position.
*
* @param topic The topic to subscribe
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
public rstring createMessageAddTopic (rstring topic) {
return createMessageAddTopics ([topic]);
}

/**
* Creates the JSON message to add multiple topics to the operator's subscription to begin consuming from the default position.
*
* @param topics The topics to subscribe
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
public rstring createMessageAddTopics (list<rstring> topics) {

int32 listSize = size(topics);
mutable rstring toAdd = "";
for(int32 i in range(0, listSize)) {
toAdd += _createTopicObject (topics[i]);

if(i+1 < listSize)
toAdd += ",";
}

return "{
\"action\" : \"ADD\",
\"topics\" : [" +
toAdd +
"]
}";
}

/**
* Creates the JSON message to remove a single topic from the operator's subscription.
*
* @param topic The topic to unsubscribe
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
public rstring createMessageRemoveTopic (rstring topic) {
return createMessageRemoveTopics ([topic]);
}

/**
* Creates the JSON message to remove multiple topics from the operator's subscription.
*
* @param topics The topics to unsubscribe
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
public rstring createMessageRemoveTopics (list<rstring> topics) {

int32 listSize = size(topics);
mutable rstring toRemove = "";
for(int32 i in range(0, listSize)) {
toRemove += _createTopicObject (topics[i]);

if (i+1 < listSize)
toRemove += ",";
}

return "{
\"action\" : \"REMOVE\",
\"topics\" : [" +
toRemove +
"]
}";
}


/**
*
* Creates the JSON message to remove multiple topic-partitions from the operator.
* Creates the JSON message to remove multiple topic partitions from the operator's partition assignment.
*
* @param topicPartitionsToRemove specifies a list of topic partitions to remove
*
Expand All @@ -46,14 +129,14 @@ public rstring createMessageRemoveTopicPartition (list<Control.TopicPartition> t
}

/**
* Creates the JSON message to add multiple topic-partitions to the operator.
* The operator will begin consuming from the topic-partitions at their specified offset.
* Creates the JSON message to add multiple topic partitions to the operator's partition assignment.
* The operator will begin consuming from the topic partitions at their specified offset.
*
* * To begin consuming from the **end** of a topic-partition, set the offset value to `-1`
* * To begin consuming from the **beginning** of a topic-partition, set the offset value to `-2`
* * To begin consuming from the **default** position, set the offset value to `-3`
* * To begin consuming from the **end** of a topic partition, set the offset value to `-1`
* * To begin consuming from the **beginning** of a topic partition, set the offset value to `-2`
* * To begin consuming from the **default** position, set the offset value to `-3`, what is effectively equivalent to `rstring createMessageAddTopicPartition (list<Control.TopicPartition> topicPartitionsToAdd)`
*
* @param topicPartitionsToAdd A list of topic-partitions to subscribe to along with the corresponding offset number to begin consuming from.
* @param topicPartitionsToAdd A list of topic partitions to assign to along with the corresponding offset number to begin consuming from.
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
Expand All @@ -76,10 +159,10 @@ public rstring createMessageAddTopicPartition (list<Control.TopicPartitionOffset
}

/**
* Creates the JSON message to add multiple topic-partitions to the operator.
* Creates the JSON message to add multiple topic partitions to the operator's partition assignment.
* The operator will begin consuming all partitions from the default positions.
*
* @param topicPartitionsToAdd A list of topic-partitions to assign.
* @param topicPartitionsToAdd A list of topic partitions to assign.
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
Expand All @@ -103,10 +186,10 @@ public rstring createMessageAddTopicPartition (list<Control.TopicPartition> topi


/**
* Creates the JSON message to remove a single topic-partition from the operator.
* Creates the JSON message to remove a single topic partition from the operator's partition assignment.
*
* @param topic The topic to unsubscribe from
* @param partition The partition to unsubscribe from
* @param topic The topic to which the partition belongs to
* @param partition The partition number of the topic to unassign
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
Expand All @@ -115,15 +198,16 @@ public rstring createMessageRemoveTopicPartition (rstring topic, int32 partition
}

/**
* Creates the JSON message to add a single topic-partition to the operator and to begin consuming at the specified offset.
* Creates the JSON message to add a single topic partition to the operator's partition assignment
* and to begin consuming at the specified offset.
*
* * To begin consuming from the **end** of a topic-partition, set the offset value to `-1`
* * To begin consuming from the **beginning** of a topic-partition, set the offset value to `-2`
* * To begin consuming from the **default** position, set the offset value to `-3`
* * To begin consuming from the **end** of a topic partition, set the offset value to `-1`
* * To begin consuming from the **beginning** of a topic partition, set the offset value to `-2`
* * To begin consuming from the **default** position, set the offset value to `-3`, what is effectively equivalent to `rstring createMessageAddTopicPartition (rstring topic, int32 partition)`
*
* @param topic The topic to subscribe to
* @param partition The partition number to assign to
* @param offset The offset of the topic-partition to begin consuming from
* @param topic The topic to which the partition belongs to
* @param partition The partition number of the topic to assign to
* @param offset The offset of the topic partition to begin consuming from
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
Expand All @@ -132,10 +216,11 @@ public rstring createMessageAddTopicPartition (rstring topic, int32 partition, i
}

/**
* Creates the JSON message to add a single topic-partition to the operator and to begin consuming at the default position.
* Creates the JSON message to add a single topic partition to the operator's partition assignment and
* to begin consuming at the default position.
*
* @param topic The topic to subscribe to
* @param partition The partition number to assign to
* @param topic The topic to which the partition belongs to
* @param partition The partition number of the topic to assign to
*
* @return A JSON string to be submitted to the KafkaConsumer input port
*/
Expand Down
Loading

0 comments on commit 3db53ad

Please sign in to comment.