When streaming raw event data to PNDA cluster via Kafka, raw data are encoded using an Avro schema with extra metadata information. These metadata include timestamp, data source, host ip. The raw data that are encoded in bytes along with metadata are persistent in HDFS and are organized on a per source, per year, per month, per day, and per hour basis.
In order to make sense of data, an application developer will need to extract data from a specific folder, decode against PNDA avro schema, and incorpate the data into data analytics processes. Instead of duplicate these common processes for each application, one can use platform libraries, a python library that hides low-level data operations by providing ready-to-use functions to access PNDA data and perform data analytics using Spark. Most importantly, the platform library can be used as an extension framework for custom data preprocessing and custom data operations.
As the diagram below, the platform libraries place an abstract layer upon low-level HDFS interfaces and allows you to rapidly develop PySpark applications or interactive notebooks (empowered by Jupyter).
+-- Platformlibs
| +-- common_helpers: defines pnda-specific utility functions
| +-- config_helper: define configuration utility (currently loading and parsing INI file only)
| +-- data_handler
| +-- DataHandler: abstract data handler class
| +-- simple_data_handler
| +-- SimpleDataHandler: default data handler class that provides loading data form hdfs as Spark RDD only. (no business logics implemented)
| +-- json_data_handler
| +-- JsonDataHandler: Json data handler class that assumes 'rawdata' is encoded in JSON format.
| +-- xr_data_handler.
+-- class XrDataHandler: a sample implementation for XR telemetry data handler.
By default for every successfully provisioned PNDA cluster, the library is pre-installed on edge node and data nodes. Follow the steps to start a pyspark session and loading data on edge node as an example:
$ sudo -u pnda pyspark --master yarn-client
>>> from platformlibs.simple_data_handler import SimpleDataHandler
>>> source = 'your data source or topic name' # is the folder name under /user/pnda/PNDA_datasets/datasets/source|topic=?
>>> path = 'year=?/month=?/hour=?' # any sub-directory under /user/pnda/PNDA_datasets/datasets/source|topic=?
>>> handler = SimpleDataHandler(sc, source, path, True) # make sure set the isTopic flag to True if Kafka topic is used as source.
>>> rdd = handler.rdd
>>> ...
If the library is required to be installed on other nodes (e.g. gateway, kafka, etc. which one should avoid), follow these steps to get started to install from source:
Once you clone the repsitory, run a build process as follows:
sudo pip install -r requirements.txt
python setup.py bdist_egg
The build process generates an 'eggy' distribution at ./dist/platformlibs-<VERSION>-py2.7.egg
.
Running unittests on either PNDA cluster or your local cluster requires these dependencies. Run the followings to install:
sudo pip -r test_requirements.txt
source install-spark.sh
and run unittests:
nosetests tests
Until this is automated, these steps are mandatory for deploying eggs. The egg must be installed on the Zeppelin node in the cluster using easy_install.
sudo easy_install /path/to/platformlibs-<VERSION>-py2.7.egg
Platform libaries requires loading configuration file from /etc/platformlibs/platformlibs.ini
, which maintains access details to Cloudera Manager. The default configurations are defined along with PNDA cluster provisioning. If the username and password have been changed, you will need update the configuration file accordingly. An example configuration file is given below:
[cm]
cm_host=192.168.0.105
cm_user=admin
cm_pass=admin
Configure in the Zeppelin/Jupyter's pyspark interpreter settings with
--py-files=/path/to/platformlibs-<VERSION>-py2.7.egg
Note: You may need to restart pyspark interpreter before using this library. If you are using this library in your Spark App, simply copy and paste this library to your app.
You can use the egg along with pyspark interactive console or Zeppelin/Jupyter by injecting the library into runtime by adding the following statement in your code:
sc.addPyFile('/path/to/platformlibs-<VERSION>-py2.7.egg')
A collection of builtin helper functions
- platformlibs.common_helpers.flatten_dict
- Description: return a flattened dictionary
- Args:
- input_d: read-only source dictionary
- result: output dictionary
>>> from platformlibs import common_helpers >>> common_helpers.flatten_dict({'a':[{'b':1}, {'c':[{'d':2}]}]}) {'a-c-d': 2, 'a-b': 1}
- platformlibs.common_helpers.get_hdfs_uri
- Description: return a hdfs root path URI
- Args:
- hostname: cloudera manager host name of a PNDA cluster
>>> from platformlibs import common_helpers >>> common_helpers.get_hdfs_uri("test-dev-cm", "admin", "admin") hdfs://HDFS-HA
- platformlibs.config_helper.get_config
- Description: parse configuration file and return a configuration dictionary
- Args:
- filename: INI configuration file path
>>> from platformlibs import config_helper >>> config_helper.get_config('/etc/platformlibs/platformlibs.init) {'cm_host':'pnda-cluster-cdh-cm', 'cm_user':'admin', 'cm_pass':'admin'}
- platformlibs.data_handler.DataHandler(spark_context, data_source, file_path, isTopic=False)
- Abstract DataHandler Class
- rdd
- Description: Decode avro datasets and return an RDD instance
- Args:
- datasource : data source type (e.g. netflow or telemetry)
- path: hdfs related directory or file path
- isTopic: set to True if data source is actually topic name
- preprocess
- Description: Preprocessing abstract interface
- Args:
- raw_data: the input RDD
- list_metric_ids
- Description: abstract interface that returns list of (<metric_id>, total_stats) pairs aggregated by descending order on per host basis
- Args:
- limit: the limit of numer of metrics of each host
- filters: filtering rules
- execute_query
- Description: return time-series data
- Args:
- filters: filtering rules
- platformlibs.simple_data_handler.SimpleDataHandler(spark_context, data_source, file_path, isTopic=False)
-
Description: default data handler class that provides loading data form hdfs as Spark RDD only. (no business logics implemented)
-
preprocess
- Description: decode avro-formatted data
- Args:
- raw_data: the input RDD
e.g.
>>> from platformlibs.simple_data_handler import SimpleDataHandler >>> handler = SimpleDataHandler(sc, 'test', 'year=2015/month=11/day=02/hour=11') >>> rdd = handler.rdd >>> rdd.count() 3451792
-
-
platformlibs.json_data_handler.JsonDataHandler(spark_context, data_source, file_path, isTopic=False)
-
Description: Json data handler that assumes 'rawdata' is encoded in JSON format
-
preprocess
- Description: decode avro-formatted data and decode json-formatted raw data value
- Args:
- raw_data: the input RDD
e.g.
>>> from platformlibs.json_data_handler import JsonDataHandler >>> handler = JsonDataHandler(sc, 'netflow', 'year=2015/month=11/day=02/hour=11') >>> rdd = handler.rdd >>> rdd.take(1) [{u'source': u'netflow', 'rawdata': {u'flow_sampler_id': u'0', u'in_bytes': u'46', u'protocol': u'6', u'first_switched': u'2015-11-02T10:59:44.999Z', u'flowset_id': u'256', u'ipv4_dst_addr': u'128.107.253.30', u'src_mask': u'0', u'src_as': u'0', u'ipv4_src_addr': u'64.39.105.23', u'last_switched': u'2015-11-02T10:59:44.999Z', u'version': u'9', u'flow_seq_num': u'1255692034', u'direction': u'0', u'l4_src_port': u'48965', u'ipv4_next_hop': u'128.107.80.58', u'dst_as': u'0', u'in_pkts': u'1', u'dst_mask': u'30', u'l4_dst_port': u'21064', u'output_snmp': u'3', u'src_tos': u'0', u'tcp_flags': u'0', u'input_snmp': u'106'}, u'timestamp': 1446462002000, u'host_ip': u'172.17.153.41'}]
-
-
list_metric_ids
- Description: Returns list of [(<metric_id>, total_stats)] pairs aggregated by descending order on per metric basis
- Args:
- limit: the limit of numer of metrics
- filters: filtering rules (a dictionary of key/value pairs used to evaluate query) [NOTE: 'metrics' is the reserved key used for selection of a list of metrics by name.]
e.g. filter definition below is used to select 'flow_seq_num' values from the router of ip address, 128.107.253.30
filters = {'ipv4_dst_addr':'128.107.253.30', 'metrics':['flow_seq_num']} SQL equivalent: 'SELECT count(flow_seq_num) FROM <table> where ipv4_dst_addr='128.107.253.30'
>>> handler.list_metric_ids(filters={'ipv4_dst_addr':'128.107.253.30', 'metrics':['flow_seq_num']}) [(u'flow_seq_num', 246)]
-
execute_query
- Description: return time-series data
- Args:
- filters: filtering rules [NOTE: In addition to 'metrics', two reserved filter keywords, 'start_ts' and 'end_ts', are used for definition of query intervals.]
e.g.
>>> handler.execute_query(filters={'ipv4_dst_addr':'128.107.253.30', 'start_ts':1446462002000, 'end_ts':1446462004000, 'metrics':['in_bytes']}) [(u'in_bytes', [(1446462002000, u'1616'), (1446462004000, u'60')], ...)]
- platformlibs.xr_data_handler.XrDataHandler(spark_context, data_source, file_path)
- Custom implementation of XR Telemetry data handler
- list_metric_ids
- Description: Returns list of (, [(<metric_id>, total_stats)) pairs aggregated by descending order on per host basis
- Args:
- limit: the limit of numer of metrics of each host
- filters: filtering rules
[NOTE: additional supported filter keys in this implementation are 'metric_type':'<mpls|ipsal|infra>']
>>> from platformlibs.xr_data_handler import XrDataHandler
>>> handler = XrDataHandler(sc, 'telemetry', 'year=2015/month=08/day=14/hour=06', 'test-dev-cm', 'admin', 'admin')
>>> handler.list_metric_ids(limit=1, filters={'host':'192.168.0.4', 'metric_type':'ipsla'})
[(u'ipslastats.Aggregated.54.NonDistributed.Target.SpecificStats.UDPJitterStats.PacketLossDS', 402)]
>>> handler.list_metric_ids(limit=1, filters={'host':'192.168.0.4', 'metric_type':'mpls'})
[(u'mplstesummary.BidirTailLSPs', 67)]
>>> handler.list_metric_ids(limit=1, filters={'host_ips':['192.168.0.4'], 'metric_type':'infra'})
[(u'ribprefix.RoutePath-Flags64', 2010)]
By default, platform libraries assumes the raw event data are encoded in json format. If your data come with different encoding formats, you will need to provide your own implementations.
Two options are avaialble:
- Custom "preprocess" implementation and render your raw data to be json formatted. With this option, you can reuse operations implemented by default.
- Custom implemenation of opertional interfaces for your custom data formats.