Skip to content

Commit

Permalink
Merge pull request #36 from kbastani/1.1.1-RELEASE
Browse files Browse the repository at this point in the history
Merging 1.1.1-RELEASE into master
  • Loading branch information
kbastani committed Apr 21, 2015
2 parents 2b39b6b + fe63f36 commit 9bf86c5
Show file tree
Hide file tree
Showing 18 changed files with 1,475 additions and 161 deletions.
23 changes: 18 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ This docker image adds high-performance graph analytics to a [Neo4j graph databa

*Closeness Centrality*

*Betweenness Centrality*

*Triangle Counting*

*Connected Components*
Expand All @@ -23,13 +25,13 @@ The Neo4j Mazerunner service in this image is a [unmanaged extension](http://neo
Installation requires 3 docker image deployments, each containing a separate linked component.

* *Hadoop HDFS* (sequenceiq/hadoop-docker:2.4.1)
* *Neo4j Graph Database* (kbastani/docker-neo4j:2.2.0)
* *Neo4j Graph Database* (kbastani/docker-neo4j:2.2.1)
* *Apache Spark Service* (kbastani/neo4j-graph-analytics:1.1.0)

Pull the following docker images:

docker pull sequenceiq/hadoop-docker:2.4.1
docker pull kbastani/docker-neo4j:2.2.0
docker pull kbastani/docker-neo4j:2.2.1
docker pull kbastani/neo4j-graph-analytics:1.1.0

After each image has been downloaded to your Docker server, run the following commands in order to create the linked containers.
Expand All @@ -43,13 +45,13 @@ After each image has been downloaded to your Docker server, run the following co
# Create Neo4j database with links to HDFS and Mazerunner
# Replace <user> and <neo4j-path>
# with the location to your existing Neo4j database store directory
docker run -d -P -v /Users/<user>/<neo4j-path>/data:/opt/data --name graphdb --link mazerunner:mazerunner --link hdfs:hdfs kbastani/docker-neo4j:2.2.0
docker run -d -P -v /Users/<user>/<neo4j-path>/data:/opt/data --name graphdb --link mazerunner:mazerunner --link hdfs:hdfs kbastani/docker-neo4j:2.2.1

### Use Existing Neo4j Database

To use an existing Neo4j database, make sure that the database store directory, typically `data/graph.db`, is available on your host OS. Read the [setup guide](https://github.com/kbastani/docker-neo4j#start-neo4j-container) for *kbastani/docker-neo4j* for additional details.

> Note: The kbastani/docker-neo4j:2.2.0 image is running Neo4j 2.2.0. If you point it to an older database store, that database may become unable to be attached to a previous version of Neo4j. Make sure you back up your store files before proceeding.
> Note: The kbastani/docker-neo4j:2.2.1 image is running Neo4j 2.2.1. If you point it to an older database store, that database may become unable to be attached to a previous version of Neo4j. Make sure you back up your store files before proceeding.
### Use New Neo4j Database

Expand All @@ -69,6 +71,7 @@ Replace `{analysis}` in the endpoint with one of the following analysis algorith

- pagerank
- closeness_centrality
- betweenness_centrality
- triangle_count
- connected_components
- strongly_connected_components
Expand Down Expand Up @@ -98,7 +101,7 @@ To begin graph analysis jobs on a particular metric, HTTP GET request on the fol

* PageRank is used to find the relative importance of a node within a set of connected nodes.

### Closeness Centrality (New)
### Closeness Centrality

http://172.17.0.21:7474/service/mazerunner/analysis/closeness_centrality/FOLLOWS

Expand All @@ -108,6 +111,16 @@ To begin graph analysis jobs on a particular metric, HTTP GET request on the fol

* A key node centrality measure in networks is closeness centrality (Freeman, 1978; Opsahl et al., 2010; Wasserman and Faust, 1994). It is defined as the inverse of farness, which in turn, is the sum of distances to all other nodes.

### Betweenness Centrality

http://172.17.0.21:7474/service/mazerunner/analysis/betweenness_centrality/FOLLOWS

* Gets all nodes connected by the `FOLLOWS` relationship and updates each node with the property key `betweenness_centrality`.

* The value of the `betweenness_centrality` property is a float data type, ex. `betweenness_centrality: 20.345`.

* Betweenness centrality is an indicator of a node's centrality in a network. It is equal to the number of shortest paths from all vertices to all others that pass through that node. A node with high betweenness centrality has a large influence on the transfer of items through the network, under the assumption that item transfer follows the shortest paths.

### Triangle Counting

http://172.17.0.21:7474/service/mazerunner/analysis/triangle_count/FOLLOWS
Expand Down
Binary file not shown.
6 changes: 3 additions & 3 deletions src/extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

<groupId>org.mazerunner</groupId>
<artifactId>extension</artifactId>
<version>1.1.0-RELEASE</version>
<version>1.1.1-RELEASE</version>

<properties>
<neo4j.version>2.2.0</neo4j.version>
<neo4j.version>2.2.1</neo4j.version>
<joda.version>2.3</joda.version>
<guava.version>17.0</guava.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
Expand All @@ -30,7 +30,7 @@
<dependency>
<groupId>org.neo4j</groupId>
<artifactId>neo4j-kernel</artifactId>
<version>2.2.0</version>
<version>2.2.1</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
Expand Down
17 changes: 14 additions & 3 deletions src/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>org.mazerunner</groupId>
<artifactId>spark</artifactId>
<version>1.1.0-RELEASE</version>
<version>1.1.1-RELEASE</version>

<properties>
<jetty.version>7.6.9.v20130131</jetty.version>
Expand Down Expand Up @@ -75,7 +75,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand All @@ -86,7 +86,7 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.10</artifactId>
<version>1.3.0</version>
<version>1.3.1</version>
<exclusions>
<exclusion>
<groupId>ch.qos.logback</groupId>
Expand Down Expand Up @@ -172,6 +172,17 @@
<artifactId>gson</artifactId>
<version>2.3</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.10</artifactId>
<version>2.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.mdr</groupId>
<artifactId>ascii-graphs_2.10</artifactId>
<version>0.0.6</version>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ public void initialize() throws IOException {

public void initializeTest()
{
hadoopSitePath = "/Users/kennybastani/hadoop-1.0.4/conf/core-site.xml";
hadoopHdfsPath = "/Users/kennybastani/hadoop-1.0.4/conf/hdfs-site.xml";
hadoopSitePath = "/hadoop-2.4.1/conf/core-site.xml";
hadoopHdfsPath = "/hadoop-2.4.1/conf/hdfs-site.xml";
hadoopHdfsUri = "hdfs://0.0.0.0:9000";
mazerunnerRelationshipType = "CONNECTED_TO";
rabbitmqNodename = "localhost";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ public class Worker {
private String sparkAppName = "mazerunner";

@Option(name="--spark.executor.memory",usage="Amount of memory to use per executor process, in the same format as JVM memory strings (e.g. 512m, 2g). ", metaVar = "<string>")
private String sparkExecutorMemory = "512m";
private String sparkExecutorMemory = "4092m";

@Option(name="--spark.master",usage="The Spark master URL (e.g. spark://localhost:7077).",metaVar="<url>")
private String sparkMaster = "local";
private String sparkMaster = "local[8]";

@Option(name="--hadoop.hdfs",usage="The HDFS URL (e.g. hdfs://0.0.0.0:9000).", metaVar = "<url>")
private String hadoopHdfs = "hdfs://0.0.0.0:9000";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class GraphProcessor {
public static final String PAGERANK = "pagerank";
public static final String STRONGLY_CONNECTED_COMPONENTS = "strongly_connected_components";
public static final String CLOSENESS_CENTRALITY = "closeness_centrality";
public static final String BETWEENNESS_CENTRALITY = "betweenness_centrality";

public static JavaSparkContext javaSparkContext = null;

Expand Down Expand Up @@ -63,9 +64,13 @@ public static void processEdgeList(ProcessorMessage processorMessage) throws IOE
results = algorithms.stronglyConnectedComponents(javaSparkContext.sc(), processorMessage.getPath());
break;
case CLOSENESS_CENTRALITY:
// Route to StronglyConnectedComponents
// Route to ClosenessCentrality
results = algorithms.closenessCentrality(javaSparkContext.sc(), processorMessage.getPath());
break;
case BETWEENNESS_CENTRALITY:
// Route to BetweennessCentrality
results = algorithms.betweennessCentrality(javaSparkContext.sc(), processorMessage.getPath());
break;
default:
// Analysis does not exist
System.out.println("Did not recognize analysis key: " + processorMessage.getAnalysis());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.mazerunner.core.abstractions

import org.apache.spark.graphx._

import scala.reflect.ClassTag

/**
* The [[PregelProgram]] abstraction wraps Spark's Pregel API implementation from the [[GraphOps]]
* class into a model that is easier to write graph algorithms.
* @tparam VertexState is the generic type representing the state of a vertex
*/
abstract class PregelProgram[VertexState: ClassTag, VD: ClassTag, ED: ClassTag] protected () extends Serializable {

@transient val graph: Graph[VD, ED]

/**
* The vertex program receives a state update and acts to update its state
* @param id is the [[VertexId]] that this program will perform a state operation for
* @param state is the current state of this [[VertexId]]
* @param message is the state received from another vertex in the graph
* @return a [[VertexState]] resulting from a comparison between current state and incoming state
*/
def vertexProgram(id : VertexId, state : VertexState, message : VertexState) : VertexState

/**
* The message broker sends and receives messages. It will initially receive one message for
* each vertex in the graph.
* @param triplet An edge triplet is an object containing a pair of connected vertex objects and edge object.
* For example (v1)-[r]->(v2)
* @return The message broker returns a key value list, each containing a VertexId and a new message
*/
def messageBroker(triplet :EdgeTriplet[VertexState, ED]) : Iterator[(VertexId, VertexState)]

/**
* This method is used to reduce or combine the set of all state outcomes produced by a vertexProgram
* for each vertex in each superstep iteration. Each vertex has a list of state updates received from
* other vertices in the graph via the messageBroker method. This method is used to reduce the list
* of state updates into a single state for the next superstep iteration.
* @param a A first [[VertexState]] representing a partial state of a vertex.
* @param b A second [[VertexState]] representing a different partial state of a vertex
* @return a merged [[VertexState]] representation from the two [[VertexState]] parameters
*/
def combiner(a: VertexState, b: VertexState) : VertexState

}
Loading

0 comments on commit 9bf86c5

Please sign in to comment.