diff --git a/docs/source/external.rst b/docs/source/external.rst
new file mode 100644
index 0000000..08bc26b
--- /dev/null
+++ b/docs/source/external.rst
@@ -0,0 +1,137 @@
+.. _external:
+
+Integrating with java-based spouts
+==================================
+
+Pyleus topologies can include externally defined spouts, in order to enable integration with `existing Storm spout libraries`_ or other custom data sources. Pyleus ships with a ``kafka`` spout provider but you implement your own
+providers to expose include other java-based spouts in your pyleus topologies as well.
+
+Implementing a spout provider
+-----------------------------
+
+In order to give Pyleus topologies access to a java-based spout, you need to implement a ``java`` class class that can
+extract any relevant configuration information provided in your topology yaml file and use it to create an instance of the spout in question.
+
+Your java class must implement the ``com.yelp.pyleus.SpoutProvider`` interface. This interface requires you to implement the following method:
+
+.. code-block:: java
+
+ public IRichSpout provide(final TopologyBuilder builder, final SpoutSpec spec) { }
+
+If you use ``maven`` you can add the ``pyleus-base`` project and ``storm-core`` as dependencies in order to include the above interface and its related classes:
+
+.. code-block:: xml
+
+
+
+ com.yelp.pyleus
+ pyleus-base
+ 0.2.4
+ provided
+
+
+ org.apache.storm
+ storm-core
+ 0.9.2-incubating
+ provided
+
+
+
+.. note::
+
+ Because the ``pyleus-base`` is not currently available in Maven Central, you will need to build it from source and install it in your local environment, nexus server or manually add it to your classpath if you are not using ``maven``:
+
+ #. Checkout the Pyleus project using git: ``git clone https://github.com/Yelp/pyleus.git``
+ #. Run the ``mvn install`` command from the ``pyleus/topology_builder/`` directory. The ``pyleus-base-0.2.4.jar`` should now be available in the ``pyleus/topology_builder/target/`` directory and in your local maven repository cache.
+
+You can extract any spout configuration parameters provided in the ``options`` section of your topology yaml file via the ``SpoutSpec`` object that is passed into the ``provide()`` method. ``Spoutspec.options`` is a map of ``String`` parameter names to their ``Object`` values.
+
+For example, given this external spout declaration in a pyleus topology yaml:
+
+.. code-block:: yaml
+
+ - spout:
+ name: sentence-spout
+ type: sentence
+ output_fields:
+ - sentence
+ options:
+ sentence: This is a sentence that will be emitted twice a second.
+ sentencesPerMin: 120
+
+The specified options can be extracted from ``SpoutSpec`` in your ``SpoutProvider`` implementation as follows:
+
+.. code-block:: java
+
+ // extracting Strings:
+ String sentence = "No Sentence Specified";
+ Object o = spec.options.get("sentence");
+ if (o != null && o instanceof String)
+ sentence = (String) o;
+
+ // extracting numeric types:
+ Integer sentencesPerMin = null;
+ o = spec.options.get("sentencesPerMin");
+ if (o != null)
+ // this will fail fast in case of non-null, invalid numeric value.
+ sentencesPerMin = Integer.valueOf(o.toString());
+
+.. seealso::
+
+ You can find a functional example of a java spout provider in the ``java_spout_provider`` example in the `GitHub repo`_, in the ``java_spout_src`` directory.
+
+Adding spout providers to pyleus.conf
+-------------------------------------
+
+Once you have implemented a ``SpoutProvider`` and compiled it into a ``.jar``, you make it available to your pyleus projects by adding it to the ``pyleus.conf`` you use to build your topologies. Each spout provider should have an alias in the ``plugins`` section of that file as follows:
+
+.. code-block:: ini
+
+ [plugins]
+ alias: full.class.name.of.SpoutProviderImpl
+ example_sp: com.example.ExampleSpoutProvider
+
+The alias(es) defined in the plugin section can be refereces as spout ``types`` in your topology file. Any options defined in your topology yaml will be passed to an instance of the the spout provider java class associated with that alias.
+
+In addition to adding the spout provider class, you aslo need to add your spout provider jar, along with any other required java dependencies by defining the ``include_java_jars`` property in the ``build`` section of your ``pyleus.conf``. You can specify multiple jar files seperated by spaces and/or directories containing jar files. For example:
+
+.. code-block:: ini
+
+ [build]
+ include_java_jars: /path/to/my/spout_provider.jar ~/another.jar /some/directory/full/of/jars
+
+.. danger::
+
+ Do not include any dependencies that are already part of the Storm distribution or already included with Pyleus. Any classes including by pyleus during the build process will replace identically named classes in the java jars you include, so referencing a different version of a jar included with Pyleus can also cause errors.
+
+.. seealso::
+
+ See :ref:`configuration` for a list of all the settings supported in the Pyleus configuration file.
+
+Adding external spouts to your topology
+---------------------------------------
+
+Once your spout providers have been added to your ``pyleus.conf`` you can add them as spouts in your topology yaml.
+
+.. code-block:: yaml
+
+ - spout:
+ name: sentence-spout
+ type: sentence
+ output_fields:
+ - sentence
+ options:
+ sentence: This is a sentence that will be emitted twice a second.
+ sentencesPerMin: 120
+
+The ``type`` should match one of the alias values defined in the ``plugins`` section of your ``pyleus.conf`` file.
+
+The ``output_fields`` is a list of the output values emitted by the spout.
+
+Any additional properties specified under ``options`` will also be passed to the spout provider.
+
+Once the spout is defined in your topology you can reference it by name in your bolt definitions the same way that you would with normal Pyleus bolt.
+
+
+.. _existing Storm spout libraries: https://storm.apache.org/about/integrates.html
+.. _GitHub repo: https://github.com/Yelp/pyleus/tree/develop/examples
diff --git a/docs/source/index.rst b/docs/source/index.rst
index 98f93ef..802b1bc 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -70,6 +70,7 @@ Documentation
options
parallelism
tick
+ external
logging
yaml
install
diff --git a/examples/java_spout_provider/java_spout_src/pom.xml b/examples/java_spout_provider/java_spout_src/pom.xml
new file mode 100644
index 0000000..1551274
--- /dev/null
+++ b/examples/java_spout_provider/java_spout_src/pom.xml
@@ -0,0 +1,36 @@
+
+ 4.0.0
+ com.yelp.pyleus
+ example-spout-provider
+ 0.0.1-SNAPSHOT
+
+
+
+ com.yelp.pyleus
+ pyleus-base
+ 0.2.4
+ provided
+
+
+ org.apache.storm
+ storm-core
+ 0.9.2-incubating
+ provided
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.3.1
+
+
+ 1.7
+
+
+
+
+
+
\ No newline at end of file
diff --git a/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpout.java b/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpout.java
new file mode 100644
index 0000000..5a83273
--- /dev/null
+++ b/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpout.java
@@ -0,0 +1,46 @@
+package com.yelp.pyleus.example;
+
+import java.util.Map;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+public class ExampleSpout extends BaseRichSpout {
+
+ private static final long serialVersionUID = 1L;
+
+ private SpoutOutputCollector collector;
+ private final String sentence;
+ private int sentencesPerMin = 30;
+
+ public ExampleSpout(String sentence) {
+ this.sentence = sentence;
+ }
+
+ public void nextTuple() {
+ Utils.sleep(1000 * 60 / sentencesPerMin);
+ collector.emit(new Values(sentence));
+ }
+
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("sentence"));
+ }
+
+ public int getSentencesPerMin() {
+ return sentencesPerMin;
+ }
+
+ public void setSentencesPerMin(int sentencesPerMin) {
+ this.sentencesPerMin = sentencesPerMin;
+ }
+
+}
diff --git a/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpoutProvider.java b/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpoutProvider.java
new file mode 100644
index 0000000..b074f7c
--- /dev/null
+++ b/examples/java_spout_provider/java_spout_src/src/main/java/com/yelp/pyleus/example/ExampleSpoutProvider.java
@@ -0,0 +1,29 @@
+package com.yelp.pyleus.example;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+
+import com.yelp.pyleus.SpoutProvider;
+import com.yelp.pyleus.spec.SpoutSpec;
+
+public class ExampleSpoutProvider implements SpoutProvider {
+
+ public IRichSpout provide(TopologyBuilder builder, SpoutSpec spec) {
+ String sentence = "No Sentence Specified";
+ Object o = spec.options.get("sentence");
+ if (o != null && o instanceof String)
+ sentence = (String) o;
+
+ Integer sentencesPerMin = null;
+ o = spec.options.get("sentencesPerMin");
+ if (o != null)
+ // this will fail fast in case of non-null, invalid numeric value.
+ sentencesPerMin = Integer.valueOf(o.toString());
+
+ ExampleSpout out = new ExampleSpout(sentence);
+ if (sentencesPerMin != null && sentencesPerMin.intValue() > 0)
+ out.setSentencesPerMin(sentencesPerMin);
+ return out;
+ }
+
+}
diff --git a/examples/java_spout_provider/pyleus.conf b/examples/java_spout_provider/pyleus.conf
new file mode 100644
index 0000000..a69b59c
--- /dev/null
+++ b/examples/java_spout_provider/pyleus.conf
@@ -0,0 +1,7 @@
+[build]
+pypi_index_url: http://0.0.0.0:7778/simple/
+
+include_java_jars: java_spout_src/target/example-spout-provider-0.0.1-SNAPSHOT.jar
+
+[plugins]
+sentence: com.yelp.pyleus.example.ExampleSpoutProvider
diff --git a/examples/java_spout_provider/pyleus_topology.yaml b/examples/java_spout_provider/pyleus_topology.yaml
new file mode 100644
index 0000000..95e3b5f
--- /dev/null
+++ b/examples/java_spout_provider/pyleus_topology.yaml
@@ -0,0 +1,20 @@
+# Very simple example of a topology with an external (java-based) spout.
+
+name: java_spout_provider
+
+topology:
+
+ - spout:
+ name: sentence-spout
+ type: sentence
+ output_fields:
+ - sentence
+ options:
+ sentence: This is a sentence that will be emitted twice a second.
+ sentencesPerMin: 120
+
+ - bolt:
+ name: sentence-consumer-bolt
+ module: sentence_consumer.word_counting_bolt
+ groupings:
+ - shuffle_grouping: sentence-spout
\ No newline at end of file
diff --git a/examples/java_spout_provider/sentence_consumer/__init__.py b/examples/java_spout_provider/sentence_consumer/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/examples/java_spout_provider/sentence_consumer/word_counting_bolt.py b/examples/java_spout_provider/sentence_consumer/word_counting_bolt.py
new file mode 100644
index 0000000..e60abd8
--- /dev/null
+++ b/examples/java_spout_provider/sentence_consumer/word_counting_bolt.py
@@ -0,0 +1,13 @@
+from pyleus.storm import SimpleBolt
+
+class WordCountingBolt(SimpleBolt):
+
+ OUTPUT_FIELDS = ["wc"]
+
+ def process_tuple(self, tup):
+ sentence = tup.values[0]
+ cnt = len(sentence.split())
+ self.emit((cnt,), anchors=[tup])
+
+if __name__ == '__main__':
+ WordCountingBolt().run()
\ No newline at end of file
diff --git a/pyleus/cli/build.py b/pyleus/cli/build.py
index ca9f477..95134b1 100644
--- a/pyleus/cli/build.py
+++ b/pyleus/cli/build.py
@@ -176,14 +176,28 @@ def _copy_dir_content(src, dst, exclude):
else:
shutil.copy2(t, dst)
+def _extract_plugin_jars(jar_paths, dest_dir):
+ if jar_paths is None:
+ return
+ for path in jar_paths:
+ for f in glob.glob(path):
+ if os.path.isfile(f) and not f.endswith(".jar"):
+ continue
+ elif os.path.isdir(f):
+ for sub in glob.glob(os.path.join(f, "*.jar")):
+ _open_jar(sub).extractall(dest_dir)
+ else:
+ _open_jar(f).extractall(dest_dir)
def _create_pyleus_jar(original_topology_spec, topology_dir, base_jar,
output_jar, zip_file, tmp_dir, include_packages,
- system_site_packages, pypi_index_url, verbose):
+ include_java_jars, system_site_packages,
+ pypi_index_url, verbose):
"""Coordinate the creation of the the topology JAR:
- Validate the topology
- Extract the base JAR into a temporary directory
+ - Copy plugin jar contents into the directory
- Copy all source files into the directory
- If using virtualenv, create it and install dependencies
- Re-pack the temporary directory into the final JAR
@@ -201,6 +215,8 @@ def _create_pyleus_jar(original_topology_spec, topology_dir, base_jar,
_validate_venv(topology_dir, venv)
+ _extract_plugin_jars(include_java_jars, tmp_dir)
+
# Extract pyleus base jar content in a tmp dir
zip_file.extractall(tmp_dir)
@@ -278,6 +294,11 @@ def build_topology_jar(configs):
if configs.include_packages is not None:
include_packages = configs.include_packages.split(" ")
+ # Extract list of java plugin jars to add to the output_jar
+ include_java_jars = None
+ if configs.include_java_jars is not None:
+ include_java_jars = configs.include_java_jars.split(" ")
+
# Open the base jar as a zip
zip_file = _open_jar(base_jar)
@@ -293,6 +314,7 @@ def build_topology_jar(configs):
zip_file=zip_file,
tmp_dir=tmp_dir,
include_packages=include_packages,
+ include_java_jars=include_java_jars,
system_site_packages=configs.system_site_packages,
pypi_index_url=configs.pypi_index_url,
verbose=configs.verbose,
diff --git a/pyleus/cli/storm_cluster.py b/pyleus/cli/storm_cluster.py
index ac65b3e..779d287 100644
--- a/pyleus/cli/storm_cluster.py
+++ b/pyleus/cli/storm_cluster.py
@@ -15,6 +15,7 @@
LOCAL_OPTION = "--local"
DEBUG_OPTION = "--debug"
STORM_JAR_JVM_OPTS = "STORM_JAR_JVM_OPTS"
+PROVIDER_ARG_PFIX = "--provider."
def _watch_over_storm(storm_pid):
@@ -43,6 +44,17 @@ def _get_storm_cmd_env(jvm_opts):
return None
+def _add_plugins_to_cmd(cmd, plugins):
+ """Adds supplied plugin to the supplied java command.
+ """
+ # Pass any plugin classes to the topology builder as
+ # --provider.=canonical.provider.class.name
+ # Example: --provider.kafka=com.yelp.pyleus.kafka.KafkaSpoutProvider
+ cmd += [
+ "{0}{1}={2}".format(PROVIDER_ARG_PFIX, k, v)
+ for (k, v)
+ in plugins
+ ]
class StormCluster(object):
"""Object representing an interface to a Storm cluster.
@@ -50,7 +62,7 @@ class StormCluster(object):
"""
def __init__(self, storm_cmd_path, nimbus_host, nimbus_port, verbose,
- jvm_opts):
+ jvm_opts, plugins):
"""Create the cluster object."""
self.storm_cmd_path = storm_cmd_path
@@ -65,6 +77,7 @@ def __init__(self, storm_cmd_path, nimbus_host, nimbus_port, verbose,
self.nimbus_port = nimbus_port
self.verbose = verbose
self.jvm_opts = jvm_opts
+ self.plugins = plugins
def _build_storm_cmd(self, cmd):
storm_cmd = [self.storm_cmd_path]
@@ -103,6 +116,8 @@ def submit(self, jar_path):
"""Submit the pyleus topology jar to the Storm cluster."""
cmd = ["jar", jar_path, TOPOLOGY_BUILDER_CLASS]
+ _add_plugins_to_cmd(cmd, self.plugins)
+
self._exec_storm_cmd(cmd)
def list(self):
@@ -128,7 +143,7 @@ class LocalStormCluster(object):
All the requests are basically translated into Storm commands.
"""
- def run(self, storm_cmd_path, jar_path, debug, jvm_opts):
+ def run(self, storm_cmd_path, jar_path, debug, jvm_opts, plugins):
"""Run locally a pyleus topology jar.
Note: In order to trigger the local mode for the selcted topology,
@@ -145,6 +160,8 @@ def run(self, storm_cmd_path, jar_path, debug, jvm_opts):
if debug:
storm_cmd.append(DEBUG_OPTION)
+ _add_plugins_to_cmd(storm_cmd, plugins)
+
env = _get_storm_cmd_env(jvm_opts)
# Having no feedback from Storm misses the point of running a topology
diff --git a/pyleus/cli/topologies.py b/pyleus/cli/topologies.py
index 3a538b4..46125f7 100644
--- a/pyleus/cli/topologies.py
+++ b/pyleus/cli/topologies.py
@@ -24,7 +24,8 @@ def run_topology_locally(jar_path, configs):
configs.storm_cmd_path,
jar_path,
configs.debug,
- configs.jvm_opts)
+ configs.jvm_opts,
+ configs.plugins)
def submit_topology(jar_path, configs):
@@ -34,7 +35,8 @@ def submit_topology(jar_path, configs):
configs.nimbus_host,
configs.nimbus_port,
configs.verbose,
- configs.jvm_opts).submit(jar_path)
+ configs.jvm_opts,
+ configs.plugins).submit(jar_path)
def list_topologies(configs):
@@ -44,7 +46,8 @@ def list_topologies(configs):
configs.nimbus_host,
configs.nimbus_port,
configs.verbose,
- configs.jvm_opts).list()
+ configs.jvm_opts,
+ configs.plugins).list()
def kill_topology(configs):
@@ -54,7 +57,8 @@ def kill_topology(configs):
configs.nimbus_host,
configs.nimbus_port,
configs.verbose,
- configs.jvm_opts).kill(configs.topology_name, configs.wait_time)
+ configs.jvm_opts,
+ configs.plugins).kill(configs.topology_name, configs.wait_time)
def is_jar(jar_path):
diff --git a/pyleus/cli/topology_spec.py b/pyleus/cli/topology_spec.py
index 06de545..18831a8 100644
--- a/pyleus/cli/topology_spec.py
+++ b/pyleus/cli/topology_spec.py
@@ -307,9 +307,12 @@ def __init__(self, specs):
.format(self.name, "module"))
self.module = specs["module"]
- if self.type == "kafka":
+ elif self.type == "kafka":
self.output_fields = {DEFAULT_STREAM: ["message"]}
+ else:
+ self.output_fields = {DEFAULT_STREAM: specs['output_fields']}
+
def update_from_module(self, specs):
"""Specific spout validation. Spouts must have output fields."""
super(SpoutSpec, self).update_from_module(specs)
diff --git a/pyleus/configuration.py b/pyleus/configuration.py
index a76a5f6..dd75383 100644
--- a/pyleus/configuration.py
+++ b/pyleus/configuration.py
@@ -43,6 +43,16 @@
# list of packages to always include in your topologies
include_packages: foo bar<4.0 baz==0.1
+
+ # list of .jar files or directories of .jar files that will be
+ # added to the classpath of your topology separated by spaces.
+ include_java_jars: /home/my/plugin/some.jar /home/also/works/for/directories
+
+ [plugins]
+ # list any external spout providers referenced in your yaml and the
+ # java classes they correspond to.
+ amqp: com.my.AMQPSpoutProvider
+ jms: com.some.other.JMSProviderClass
"""
from __future__ import absolute_import
@@ -54,6 +64,7 @@
from pyleus.exception import ConfigurationError
from pyleus.compat import configparser
+PLUGIN_SECTION = "plugins"
# Configuration files paths in order of increasing precedence
# Please keep in sync with module docstring
@@ -65,10 +76,10 @@
Configuration = collections.namedtuple(
"Configuration",
- "base_jar config_file debug func include_packages output_jar \
- pypi_index_url nimbus_host nimbus_port storm_cmd_path \
+ "base_jar config_file debug func include_packages include_java_jars \
+ output_jar pypi_index_url nimbus_host nimbus_port storm_cmd_path \
system_site_packages topology_path topology_jar topology_name verbose \
- wait_time jvm_opts"
+ wait_time jvm_opts plugins"
)
"""Namedtuple containing all pyleus configuration values."""
@@ -79,6 +90,7 @@
debug=False,
func=None,
include_packages=None,
+ include_java_jars=None,
output_jar=None,
pypi_index_url=None,
nimbus_host=None,
@@ -91,6 +103,7 @@
verbose=False,
wait_time=None,
jvm_opts=None,
+ plugins=[],
)
@@ -132,12 +145,14 @@ def load_configuration(cmd_line_file):
config = configparser.SafeConfigParser()
config.read(config_files_hierarchy)
- configs = update_configuration(
- DEFAULTS,
- dict(
- (config_name, config_value)
- for section in config.sections()
- for config_name, config_value in config.items(section)
- )
+ update_dict = dict(
+ (config_name, config_value)
+ for section in config.sections() if section != PLUGIN_SECTION
+ for config_name, config_value in config.items(section)
)
+
+ if (config.has_section(PLUGIN_SECTION)):
+ update_dict["plugins"] = config.items(PLUGIN_SECTION)
+
+ configs = update_configuration(DEFAULTS, update_dict)
return configs
diff --git a/tests/cli/build_test.py b/tests/cli/build_test.py
index 719ce2e..ad5c9a4 100644
--- a/tests/cli/build_test.py
+++ b/tests/cli/build_test.py
@@ -177,3 +177,41 @@ def mock_execute_module(module, cwd):
build._remove_pyleus_base_jar(mock_venv)
assert not mock_remove.called
+
+ @mock.patch.object(build, '_open_jar', autospec=True)
+ @mock.patch.object(os.path, 'isfile', autospec=True)
+ @mock.patch.object(glob, 'glob', autospec=True)
+ def test__extract_plugin_jars_with_jars(self, mock_glob, mock_isfile, mock__open_jar):
+ jars = ["/foo/bar/some.jar", "/bar/other.jar"]
+ mock_glob.side_effect = lambda value: [value]
+ mock_isfile.side_effect = lambda value: True
+ build._extract_plugin_jars(jars, "/tmp")
+ mock__open_jar.assert_any_call(jars[0])
+ mock__open_jar.assert_any_call(jars[1])
+
+ @mock.patch.object(build, '_open_jar', autospec=True)
+ @mock.patch.object(os.path, 'isfile', autospec=True)
+ @mock.patch.object(glob, 'glob', autospec=True)
+ def test__extract_plugin_jars_ignores_non_jars(self, mock_glob, mock_isfile, mock__open_jar):
+ jars = ["/foo/bar/some.jar", "/bar/other.notjar"]
+ mock_glob.side_effect = lambda value: [value]
+ mock_isfile.side_effect = lambda value: True
+ build._extract_plugin_jars(jars, "/tmp")
+ mock__open_jar.assert_called_with(jars[0])
+
+ @mock.patch.object(build, '_open_jar', autospec=True)
+ @mock.patch.object(os.path, 'isdir', autospec=True)
+ @mock.patch.object(glob, 'glob', autospec=True)
+ def test__extract_plugin_jars_expands_dirs(self, mock_glob, mock_isdir, mock__open_jar):
+ jar_dir = "/foo/bar/"
+ jars = ["some.jar", "other.jar"]
+ def glob_result(path):
+ if path == jar_dir:
+ return [jar_dir]
+ else:
+ return jars
+ mock_glob.side_effect = glob_result
+ mock_isdir.side_effect = lambda value: True
+ build._extract_plugin_jars(jars, "/tmp")
+ mock__open_jar.assert_any_call(jars[0])
+ mock__open_jar.assert_any_call(jars[1])
\ No newline at end of file
diff --git a/tests/cli/storm_cluster_test.py b/tests/cli/storm_cluster_test.py
index 5c40db9..915608c 100644
--- a/tests/cli/storm_cluster_test.py
+++ b/tests/cli/storm_cluster_test.py
@@ -4,6 +4,7 @@
from pyleus.cli.storm_cluster import _get_storm_cmd_env
from pyleus.cli.storm_cluster import STORM_JAR_JVM_OPTS
+from pyleus.cli.storm_cluster import PROVIDER_ARG_PFIX
from pyleus.cli.storm_cluster import StormCluster
from pyleus.cli.storm_cluster import TOPOLOGY_BUILDER_CLASS
from pyleus.testing import mock
@@ -34,6 +35,7 @@ def cluster(self):
mock.sentinel.nimbus_port,
mock.sentinel.verbose,
mock.sentinel.jvm_opts,
+ [],
)
def test__build_storm_cmd_no_port(self, cluster):
@@ -56,3 +58,15 @@ def test_submit(self, cluster):
cluster.submit(mock.sentinel.jar_path)
mock_exec.assert_called_once_with(["jar", mock.sentinel.jar_path, TOPOLOGY_BUILDER_CLASS])
+
+ def test_submit_with_plugins(self, cluster):
+ plugins = [("alias", "plugin.Class1"), ("otherone", "plugin.OtherOne")]
+ expected = (["jar", mock.sentinel.jar_path, TOPOLOGY_BUILDER_CLASS]
+ + [PROVIDER_ARG_PFIX+a+"="+b for (a,b) in plugins])
+
+ cluster.plugins.extend(plugins)
+ with mock.patch.object(cluster, '_exec_storm_cmd', autospec=True) as mock_exec:
+ cluster.submit(mock.sentinel.jar_path)
+
+ mock_exec.assert_called_once_with(expected)
+
diff --git a/tests/cli/topologies_test.py b/tests/cli/topologies_test.py
index 51c3183..31b4946 100644
--- a/tests/cli/topologies_test.py
+++ b/tests/cli/topologies_test.py
@@ -39,6 +39,7 @@ def test_submit_topology(configs):
configs.nimbus_port,
configs.verbose,
configs.jvm_opts,
+ configs.plugins,
)
mock_storm_cluster.submit.assert_called_once_with(mock.sentinel.jar_path)
@@ -57,6 +58,7 @@ def test_kill_topology(configs):
configs.nimbus_port,
configs.verbose,
configs.jvm_opts,
+ configs.plugins,
)
mock_storm_cluster.kill.assert_called_once_with(configs.topology_name, configs.wait_time)
@@ -75,6 +77,7 @@ def test_list_topologies(configs):
configs.nimbus_port,
configs.verbose,
configs.jvm_opts,
+ configs.plugins,
)
mock_storm_cluster.list.assert_called_once_with()
diff --git a/tests/configuration_test.py b/tests/configuration_test.py
index 4e6a289..d157b57 100644
--- a/tests/configuration_test.py
+++ b/tests/configuration_test.py
@@ -37,3 +37,39 @@ def test_update_configuration(self):
assert default_config.pypi_index_url == None
assert updated_config.pypi_index_url == \
"http://pypi-ninja.ninjacorp.com/simple"
+
+ def test_update_configuration_with_plugins(self):
+ default_config = configuration.DEFAULTS
+ update_dict = {
+ "plugins": [("a","some.Class"), ("b","some.other.Class")]
+ }
+ updated_config = configuration.update_configuration(
+ default_config, update_dict)
+ assert default_config.plugins == []
+ assert updated_config.plugins == [
+ ("a","some.Class"),
+ ("b","some.other.Class")
+ ]
+
+ @mock.patch.object(os.path, "exists", autospec=True)
+ @mock.patch.object(os.path, "isfile", autospec=True)
+ @mock.patch("pyleus.configuration.configparser.SafeConfigParser")
+ def test_load_configuration_with_plugins(
+ self, mock_isfile, mock_exists, MockSafeConfigParser):
+
+ expected_plugins = [
+ ("alias1", "java.class.named.Alias1"),
+ ("alias2", "java.class.named.Alias2")
+ ]
+
+ def get_plugins(arg=None):
+ if arg == "plugins":
+ return expected_plugins
+ else:
+ return []
+
+ cfp = configuration.configparser.SafeConfigParser()
+ cfp.items.side_effect = get_plugins
+ config = configuration.load_configuration("")
+ cfp.items.assert_called_with("plugins")
+ assert config.plugins == expected_plugins
diff --git a/topology_builder/src/main/java/com/yelp/pyleus/PyleusTopologyBuilder.java b/topology_builder/src/main/java/com/yelp/pyleus/PyleusTopologyBuilder.java
index 53a1414..f79ea5d 100644
--- a/topology_builder/src/main/java/com/yelp/pyleus/PyleusTopologyBuilder.java
+++ b/topology_builder/src/main/java/com/yelp/pyleus/PyleusTopologyBuilder.java
@@ -2,6 +2,7 @@
import java.io.FileNotFoundException;
import java.io.InputStream;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -17,13 +18,9 @@
import backtype.storm.topology.SpoutDeclarer;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
-import storm.kafka.KafkaSpout;
-import storm.kafka.KeyValueSchemeAsMultiScheme;
-import storm.kafka.SpoutConfig;
-import storm.kafka.StringKeyValueScheme;
-import storm.kafka.ZkHosts;
import com.yelp.pyleus.bolt.PythonBolt;
+import com.yelp.pyleus.kafka.KafkaSpoutProvider;
import com.yelp.pyleus.spec.BoltSpec;
import com.yelp.pyleus.spec.ComponentSpec;
import com.yelp.pyleus.spec.SpoutSpec;
@@ -39,11 +36,20 @@ public class PyleusTopologyBuilder {
public static final PythonComponentsFactory pyFactory = new PythonComponentsFactory();
+ private static final String USAGE = "Usage: PyleusTopologyBuilder [--local [--debug]]";
+ private static final String PROVIDER_ARG_PFIX = "--provider.";
+
+ private static final Map providers = new HashMap();
+
+ static {
+ providers.put("kafka", KafkaSpoutProvider.class.getCanonicalName());
+ }
+
public static void handleBolt(final TopologyBuilder builder, final BoltSpec spec,
- final TopologySpec topologySpec) {
+ final TopologySpec topologySpec) {
- PythonBolt bolt = pyFactory.createPythonBolt(spec.module,
- spec.options, topologySpec.logging_config, topologySpec.serializer);
+ PythonBolt bolt = pyFactory.createPythonBolt(spec.module, spec.options,
+ topologySpec.logging_config, topologySpec.serializer);
if (spec.output_fields != null) {
bolt.setOutputFields(spec.output_fields);
@@ -96,11 +102,11 @@ public static void handleBolt(final TopologyBuilder builder, final BoltSpec spec
}
public static void handleSpout(final TopologyBuilder builder, final SpoutSpec spec,
- final TopologySpec topologySpec) {
+ final TopologySpec topologySpec) {
IRichSpout spout;
- if (spec.type.equals("kafka")) {
- spout = handleKafkaSpout(builder, spec);
+ if (providers.containsKey(spec.type)) {
+ spout = handleProvidedSpout(builder, spec);
} else {
spout = handlePythonSpout(builder, spec, topologySpec);
}
@@ -117,61 +123,29 @@ public static void handleSpout(final TopologyBuilder builder, final SpoutSpec sp
}
}
- public static IRichSpout handleKafkaSpout(
- @SuppressWarnings("unused") final TopologyBuilder builder,
- final SpoutSpec spec) {
- String topic = (String) spec.options.get("topic");
- if (topic == null) {
- throw new RuntimeException("Kafka spout must have topic");
- }
-
- String zkHosts = (String) spec.options.get("zk_hosts");
- if (zkHosts == null) {
- throw new RuntimeException("Kafka spout must have zk_hosts");
- }
-
- String zkRoot = (String) spec.options.get("zk_root");
- if (zkRoot == null) {
- zkRoot = String.format(KAFKA_ZK_ROOT_FMT, spec.name);
- }
-
- String consumerId = (String) spec.options.get("consumer_id");
- if (consumerId == null) {
- consumerId = String.format(KAFKA_CONSUMER_ID_FMT, spec.name);
- }
-
- SpoutConfig config = new SpoutConfig(
- new ZkHosts(zkHosts),
- topic,
- zkRoot,
- consumerId
- );
-
- Boolean forceFromStart = (Boolean) spec.options.get("from_start");
- if (forceFromStart != null) {
- config.forceFromStart = forceFromStart;
- }
-
- Long startOffsetTime = (Long) spec.options.get("start_offset_time");
- if (startOffsetTime != null) {
- config.startOffsetTime = startOffsetTime;
+ public static IRichSpout handleProvidedSpout(final TopologyBuilder builder, final SpoutSpec spec) {
+ String providerClassName = providers.get(spec.type);
+ try {
+ Class> providerClass = Class.forName(providerClassName);
+ if (!SpoutProvider.class.isAssignableFrom(providerClass))
+ throw new RuntimeException(String.format("%s does not implement SpoutProvider.",
+ providerClassName));
+ SpoutProvider provider = (SpoutProvider) providerClass.newInstance();
+ return provider.provide(builder, spec);
+ } catch (ClassNotFoundException ex) {
+ throw new RuntimeException(ex);
+ } catch (InstantiationException ex) {
+ throw new RuntimeException(ex);
+ } catch (IllegalAccessException ex) {
+ throw new RuntimeException(ex);
}
-
- // TODO: this mandates that messages are UTF-8. We should allow for binary data
- // in the future, or once users can have Java components, let them provide their
- // own JSON serialization method. Or wait on STORM-138.
- config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
-
- return new KafkaSpout(config);
}
- public static IRichSpout handlePythonSpout(
- @SuppressWarnings("unused") final TopologyBuilder builder,
- final SpoutSpec spec,
+ public static IRichSpout handlePythonSpout(final TopologyBuilder builder, final SpoutSpec spec,
final TopologySpec topologySpec) {
- PythonSpout spout = pyFactory.createPythonSpout(spec.module,
- spec.options, topologySpec.logging_config, topologySpec.serializer);
+ PythonSpout spout = pyFactory.createPythonSpout(spec.module, spec.options,
+ topologySpec.logging_config, topologySpec.serializer);
if (spec.output_fields != null) {
spout.setOutputFields(spec.output_fields);
@@ -195,14 +169,16 @@ public static StormTopology buildTopology(final TopologySpec spec) {
} else if (component.isSpout()) {
handleSpout(builder, component.spout, spec);
} else {
- throw new RuntimeException(String.format("Unknown component: only bolts and spouts are supported."));
+ throw new RuntimeException(
+ String.format("Unknown component: only bolts and spouts are supported."));
}
}
return builder.createTopology();
}
- private static InputStream getYamlInputStream(final String filename) throws FileNotFoundException {
+ private static InputStream getYamlInputStream(final String filename)
+ throws FileNotFoundException {
return PyleusTopologyBuilder.class.getResourceAsStream(filename);
}
@@ -217,7 +193,8 @@ private static void setSerializer(Config conf, final String serializer) {
}
}
- private static void runLocally(final String topologyName, final StormTopology topology, boolean debug, final String serializer) {
+ private static void runLocally(final String topologyName, final StormTopology topology,
+ boolean debug, final String serializer) {
Config conf = new Config();
setSerializer(conf, serializer);
conf.setDebug(debug);
@@ -226,6 +203,7 @@ private static void runLocally(final String topologyName, final StormTopology to
final LocalCluster cluster = new LocalCluster();
Runtime.getRuntime().addShutdownHook(new Thread() {
+ @Override
public void run() {
try {
cluster.shutdown();
@@ -249,28 +227,31 @@ public static void main(String[] args) {
boolean runLocally = false;
boolean debug = false;
- if (args.length > 2) {
- System.err.println("Usage: PyleusTopologyBuilder [--local [--debug]]");
- System.exit(1);
- }
-
- if (args.length == 1) {
- if (args[0].equals("--local")) {
+ for (String arg : args) {
+ if (arg.equals("--local")) {
runLocally = true;
+ } else if (arg.equals("--debug")) {
+ debug = true;
+ } else if (arg.startsWith(PROVIDER_ARG_PFIX)) {
+ String[] providerTuple = arg.replace(PROVIDER_ARG_PFIX, "").split("=");
+ if (providerTuple.length != 2) {
+ System.err.println("Invalid parameter: " + arg);
+ System.err.println(USAGE);
+ System.exit(1);
+ }
+ System.out.println("provider: " + providerTuple[0] + " = " + providerTuple[1]);
+ providers.put(providerTuple[0], providerTuple[1]);
} else {
- System.err.println("Usage: PyleusTopologyBuilder [--local [--debug]]");
+ System.err.println("Invalid parameter: " + arg);
+ System.err.println(USAGE);
System.exit(1);
}
}
- if (args.length == 2) {
- if (args[0].equals("--local") && args[1].equals("--debug")) {
- runLocally = true;
- debug = true;
- } else {
- System.err.println("Usage: PyleusTopologyBuilder [--local [--debug]]");
- System.exit(1);
- }
+ if (debug && !runLocally) {
+ System.err.println("--debug option is only available when running locally.");
+ System.err.println(USAGE);
+ System.exit(1);
}
final InputStream yamlInputStream;
diff --git a/topology_builder/src/main/java/com/yelp/pyleus/SpoutProvider.java b/topology_builder/src/main/java/com/yelp/pyleus/SpoutProvider.java
new file mode 100644
index 0000000..29a03bc
--- /dev/null
+++ b/topology_builder/src/main/java/com/yelp/pyleus/SpoutProvider.java
@@ -0,0 +1,12 @@
+package com.yelp.pyleus;
+
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+
+import com.yelp.pyleus.spec.SpoutSpec;
+
+public interface SpoutProvider {
+
+ IRichSpout provide(final TopologyBuilder builder,
+ final SpoutSpec spec);
+}
diff --git a/topology_builder/src/main/java/com/yelp/pyleus/kafka/KafkaSpoutProvider.java b/topology_builder/src/main/java/com/yelp/pyleus/kafka/KafkaSpoutProvider.java
new file mode 100644
index 0000000..da18ee9
--- /dev/null
+++ b/topology_builder/src/main/java/com/yelp/pyleus/kafka/KafkaSpoutProvider.java
@@ -0,0 +1,62 @@
+package com.yelp.pyleus.kafka;
+
+import storm.kafka.KafkaSpout;
+import storm.kafka.KeyValueSchemeAsMultiScheme;
+import storm.kafka.SpoutConfig;
+import storm.kafka.StringKeyValueScheme;
+import storm.kafka.ZkHosts;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+
+import com.yelp.pyleus.SpoutProvider;
+import com.yelp.pyleus.spec.SpoutSpec;
+
+public class KafkaSpoutProvider implements SpoutProvider {
+
+ public static final String KAFKA_ZK_ROOT_FMT = "/pyleus-kafka-offsets/%s";
+ public static final String KAFKA_CONSUMER_ID_FMT = "pyleus-%s";
+
+ @Override
+ public IRichSpout provide(TopologyBuilder builder, SpoutSpec spec) {
+ String topic = (String) spec.options.get("topic");
+ if (topic == null) { throw new RuntimeException("Kafka spout must have topic"); }
+
+ String zkHosts = (String) spec.options.get("zk_hosts");
+ if (zkHosts == null) { throw new RuntimeException("Kafka spout must have zk_hosts"); }
+
+ String zkRoot = (String) spec.options.get("zk_root");
+ if (zkRoot == null) {
+ zkRoot = String.format(KAFKA_ZK_ROOT_FMT, spec.name);
+ }
+
+ String consumerId = (String) spec.options.get("consumer_id");
+ if (consumerId == null) {
+ consumerId = String.format(KAFKA_CONSUMER_ID_FMT, spec.name);
+ }
+
+ SpoutConfig config = new SpoutConfig(
+ new ZkHosts(zkHosts),
+ topic,
+ zkRoot,
+ consumerId
+ );
+
+ Boolean forceFromStart = (Boolean) spec.options.get("from_start");
+ if (forceFromStart != null) {
+ config.forceFromStart = forceFromStart;
+ }
+
+ Object startOffsetTime = spec.options.get("start_offset_time");
+ if (startOffsetTime != null) {
+ config.startOffsetTime = Long.valueOf(startOffsetTime.toString());
+ }
+
+ // TODO: this mandates that messages are UTF-8. We should allow for binary data
+ // in the future, or once users can have Java components, let them provide their
+ // own JSON serialization method. Or wait on STORM-138.
+ config.scheme = new KeyValueSchemeAsMultiScheme(new StringKeyValueScheme());
+
+ return new KafkaSpout(config);
+ }
+
+}