Skip to content
This repository has been archived by the owner on Apr 18, 2018. It is now read-only.

API for adding java-based spouts to Pyleus topologies #99

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions docs/source/external.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
.. _external:

Integrating with java-based spouts
==================================

Pyleus topologies can include externally defined spouts, in order to enable integration with `existing Storm spout libraries`_ or other custom data sources. Pyleus ships with a ``kafka`` spout provider but you implement your own
providers to expose include other java-based spouts in your pyleus topologies as well.

Implementing a spout provider
-----------------------------

In order to give Pyleus topologies access to a java-based spout, you need to implement a ``java`` class class that can
extract any relevant configuration information provided in your topology yaml file and use it to create an instance of the spout in question.

Your java class must implement the ``com.yelp.pyleus.SpoutProvider`` interface. This interface requires you to implement the following method:

.. code-block:: java

public IRichSpout provide(final TopologyBuilder builder, final SpoutSpec spec) { }

If you use ``maven`` you can add the ``pyleus-base`` project and ``storm-core`` as dependencies in order to include the above interface and its related classes:

.. code-block:: xml

<dependencies>
<dependency>
<groupId>com.yelp.pyleus</groupId>
<artifactId>pyleus-base</artifactId>
<version>0.2.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
</dependencies>

.. note::

Because the ``pyleus-base`` is not currently available in Maven Central, you will need to build it from source and install it in your local environment, nexus server or manually add it to your classpath if you are not using ``maven``:

#. Checkout the Pyleus project using git: ``git clone https://github.com/Yelp/pyleus.git``
#. Run the ``mvn install`` command from the ``pyleus/topology_builder/`` directory. The ``pyleus-base-0.2.4.jar`` should now be available in the ``pyleus/topology_builder/target/`` directory and in your local maven repository cache.

You can extract any spout configuration parameters provided in the ``options`` section of your topology yaml file via the ``SpoutSpec`` object that is passed into the ``provide()`` method. ``Spoutspec.options`` is a map of ``String`` parameter names to their ``Object`` values.

For example, given this external spout declaration in a pyleus topology yaml:

.. code-block:: yaml

- spout:
name: sentence-spout
type: sentence
output_fields:
- sentence
options:
sentence: This is a sentence that will be emitted twice a second.
sentencesPerMin: 120

The specified options can be extracted from ``SpoutSpec`` in your ``SpoutProvider`` implementation as follows:

.. code-block:: java

// extracting Strings:
String sentence = "No Sentence Specified";
Object o = spec.options.get("sentence");
if (o != null && o instanceof String)
sentence = (String) o;

// extracting numeric types:
Integer sentencesPerMin = null;
o = spec.options.get("sentencesPerMin");
if (o != null)
// this will fail fast in case of non-null, invalid numeric value.
sentencesPerMin = Integer.valueOf(o.toString());

.. seealso::

You can find a functional example of a java spout provider in the ``java_spout_provider`` example in the `GitHub repo`_, in the ``java_spout_src`` directory.

Adding spout providers to pyleus.conf
-------------------------------------

Once you have implemented a ``SpoutProvider`` and compiled it into a ``.jar``, you make it available to your pyleus projects by adding it to the ``pyleus.conf`` you use to build your topologies. Each spout provider should have an alias in the ``plugins`` section of that file as follows:

.. code-block:: ini

[plugins]
alias: full.class.name.of.SpoutProviderImpl
example_sp: com.example.ExampleSpoutProvider

The alias(es) defined in the plugin section can be refereces as spout ``types`` in your topology file. Any options defined in your topology yaml will be passed to an instance of the the spout provider java class associated with that alias.

In addition to adding the spout provider class, you aslo need to add your spout provider jar, along with any other required java dependencies by defining the ``include_java_jars`` property in the ``build`` section of your ``pyleus.conf``. You can specify multiple jar files seperated by spaces and/or directories containing jar files. For example:

.. code-block:: ini

[build]
include_java_jars: /path/to/my/spout_provider.jar ~/another.jar /some/directory/full/of/jars

.. danger::

Do not include any dependencies that are already part of the Storm distribution or already included with Pyleus. Any classes including by pyleus during the build process will replace identically named classes in the java jars you include, so referencing a different version of a jar included with Pyleus can also cause errors.

.. seealso::

See :ref:`configuration` for a list of all the settings supported in the Pyleus configuration file.

Adding external spouts to your topology
---------------------------------------

Once your spout providers have been added to your ``pyleus.conf`` you can add them as spouts in your topology yaml.

.. code-block:: yaml

- spout:
name: sentence-spout
type: sentence
output_fields:
- sentence
options:
sentence: This is a sentence that will be emitted twice a second.
sentencesPerMin: 120

The ``type`` should match one of the alias values defined in the ``plugins`` section of your ``pyleus.conf`` file.

The ``output_fields`` is a list of the output values emitted by the spout.

Any additional properties specified under ``options`` will also be passed to the spout provider.

Once the spout is defined in your topology you can reference it by name in your bolt definitions the same way that you would with normal Pyleus bolt.


.. _existing Storm spout libraries: https://storm.apache.org/about/integrates.html
.. _GitHub repo: https://github.com/Yelp/pyleus/tree/develop/examples
1 change: 1 addition & 0 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ Documentation
options
parallelism
tick
external
logging
yaml
install
Expand Down
36 changes: 36 additions & 0 deletions examples/java_spout_provider/java_spout_src/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.yelp.pyleus</groupId>
<artifactId>example-spout-provider</artifactId>
<version>0.0.1-SNAPSHOT</version>

<dependencies>
<dependency>
<groupId>com.yelp.pyleus</groupId>
<artifactId>pyleus-base</artifactId>
<version>0.2.4</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<version>0.9.2-incubating</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.1</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.yelp.pyleus.example;

import java.util.Map;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class ExampleSpout extends BaseRichSpout {

private static final long serialVersionUID = 1L;

private SpoutOutputCollector collector;
private final String sentence;
private int sentencesPerMin = 30;

public ExampleSpout(String sentence) {
this.sentence = sentence;
}

public void nextTuple() {
Utils.sleep(1000 * 60 / sentencesPerMin);
collector.emit(new Values(sentence));
}

public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}

public int getSentencesPerMin() {
return sentencesPerMin;
}

public void setSentencesPerMin(int sentencesPerMin) {
this.sentencesPerMin = sentencesPerMin;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.yelp.pyleus.example;

import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.TopologyBuilder;

import com.yelp.pyleus.SpoutProvider;
import com.yelp.pyleus.spec.SpoutSpec;

public class ExampleSpoutProvider implements SpoutProvider {

public IRichSpout provide(TopologyBuilder builder, SpoutSpec spec) {
String sentence = "No Sentence Specified";
Object o = spec.options.get("sentence");
if (o != null && o instanceof String)
sentence = (String) o;

Integer sentencesPerMin = null;
o = spec.options.get("sentencesPerMin");
if (o != null)
// this will fail fast in case of non-null, invalid numeric value.
sentencesPerMin = Integer.valueOf(o.toString());

ExampleSpout out = new ExampleSpout(sentence);
if (sentencesPerMin != null && sentencesPerMin.intValue() > 0)
out.setSentencesPerMin(sentencesPerMin);
return out;
}

}
7 changes: 7 additions & 0 deletions examples/java_spout_provider/pyleus.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[build]
pypi_index_url: http://0.0.0.0:7778/simple/

include_java_jars: java_spout_src/target/example-spout-provider-0.0.1-SNAPSHOT.jar

[plugins]
sentence: com.yelp.pyleus.example.ExampleSpoutProvider
20 changes: 20 additions & 0 deletions examples/java_spout_provider/pyleus_topology.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
# Very simple example of a topology with an external (java-based) spout.

name: java_spout_provider

topology:

- spout:
name: sentence-spout
type: sentence
output_fields:
- sentence
options:
sentence: This is a sentence that will be emitted twice a second.
sentencesPerMin: 120

- bolt:
name: sentence-consumer-bolt
module: sentence_consumer.word_counting_bolt
groupings:
- shuffle_grouping: sentence-spout
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from pyleus.storm import SimpleBolt

class WordCountingBolt(SimpleBolt):

OUTPUT_FIELDS = ["wc"]

def process_tuple(self, tup):
sentence = tup.values[0]
cnt = len(sentence.split())
self.emit((cnt,), anchors=[tup])

if __name__ == '__main__':
WordCountingBolt().run()
24 changes: 23 additions & 1 deletion pyleus/cli/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,14 +176,28 @@ def _copy_dir_content(src, dst, exclude):
else:
shutil.copy2(t, dst)

def _extract_plugin_jars(jar_paths, dest_dir):
if jar_paths is None:
return
for path in jar_paths:
for f in glob.glob(path):
if os.path.isfile(f) and not f.endswith(".jar"):
continue
elif os.path.isdir(f):
for sub in glob.glob(os.path.join(f, "*.jar")):
_open_jar(sub).extractall(dest_dir)
else:
_open_jar(f).extractall(dest_dir)

def _create_pyleus_jar(original_topology_spec, topology_dir, base_jar,
output_jar, zip_file, tmp_dir, include_packages,
system_site_packages, pypi_index_url, verbose):
include_java_jars, system_site_packages,
pypi_index_url, verbose):
"""Coordinate the creation of the the topology JAR:

- Validate the topology
- Extract the base JAR into a temporary directory
- Copy plugin jar contents into the directory
- Copy all source files into the directory
- If using virtualenv, create it and install dependencies
- Re-pack the temporary directory into the final JAR
Expand All @@ -201,6 +215,8 @@ def _create_pyleus_jar(original_topology_spec, topology_dir, base_jar,

_validate_venv(topology_dir, venv)

_extract_plugin_jars(include_java_jars, tmp_dir)

# Extract pyleus base jar content in a tmp dir
zip_file.extractall(tmp_dir)

Expand Down Expand Up @@ -278,6 +294,11 @@ def build_topology_jar(configs):
if configs.include_packages is not None:
include_packages = configs.include_packages.split(" ")

# Extract list of java plugin jars to add to the output_jar
include_java_jars = None
if configs.include_java_jars is not None:
include_java_jars = configs.include_java_jars.split(" ")

# Open the base jar as a zip
zip_file = _open_jar(base_jar)

Expand All @@ -293,6 +314,7 @@ def build_topology_jar(configs):
zip_file=zip_file,
tmp_dir=tmp_dir,
include_packages=include_packages,
include_java_jars=include_java_jars,
system_site_packages=configs.system_site_packages,
pypi_index_url=configs.pypi_index_url,
verbose=configs.verbose,
Expand Down
Loading