Nutcat is a Pregel-like Graph Processing Framework implemented in Scala.
-
Use SBT to pack the whole project for distribution.
$ cd cat $ sbt $ clean $ pack
-
Launch master and workers.
$ cd target/pack/bin $ start_cat_master $ start_cat_worker
-
You can use your own config file, please use
--conf <file>
to override some entries.cat.master.actorsystem = "catMasterSystem" cat.master.actor = "master" cat.master.address = "127.0.0.1" cat.master.port = 8888 cat.worker.actorsystem = "catWorkerSystem" cat.worker.actor = "worker" cat.worker.address = "127.0.0.1" cat.worker.port = 0 cat.worker.registration.retries = 10 cat.worker.registration.interval = 2 cat.worker.timeout = 10 cat.worker.masterakkaurls = "akka.tcp://[email protected]:8888/user/master" cat.executor.buffersize = 20000 cat.executor.timeout = 10 cat.ui.root = "127.0.0.1" cat.ui.port = 8080 cat.ui.proxy.path = "http://127.0.0.1:8080"
-
You can use
--help
for more information.$ start_cat_master --help
-
Then you can submit your own jar and execute your main class.
$ submit_cat_job --jar <path_to_your_jar> --mainClass <main_class>
-
You should include
api_2.11-x.jar
andcommon_2.11-x.jar
into your class path. -
Use VertexData object.
final case class VertexData[VERTEX, DATA](vertexId: VERTEX, vertexStatus: VertexStatus, data: DATA, connectedVertices: Set[VERTEX])
The
VERTEX
is the vertexId type.The VertexData is the API for one vertex with the Graph.
It consists of the vertex itself and the connected vertices.
It also contains the user defined data.
VERTEX: The type of vertexId.
DATA: The type of user defined data.
vertexId: The vertexId.
vertexStatus: The status of a vertex, if Active it will paricipate in the superstep.
data: The user defined data.
connectedVertices: The connected vertices.
-
Define your own GraphLoader object.
class SSSPGraphLoader extends GraphLoader[Long, Unit] { override def loadGraph(): Array[VertexData[Long, Unit]] = ??? }
The DemoVertexLoader has to implements the traits
trait GraphLoader[VERTEX, DATA] { def loadGraph(): Array[VertexData[VERTEX, DATA]]
VERTEX: The type of vertexId.
DATA: The type of user defined data.
The loadGraph function is the key to loading the graph.
-
Define your own VertexComputer object.
class SSSPVertexComputer extends VertexComputer[Long, Unit, SSSPMessage, String] with Logging { override def compute(superStepId: Long, vertexData: VertexData[Long, Unit], inMessageList: Array[(Long, SSSPMessage)], globalMessage: Option[String], context: VertexContext[Long, Unit, SSSPMessage, String]): Unit = ??? }
The VertexComputer has to implements the traits
VertexComputer[VERTEX, DATA, MSG, GMSG]
and defines what each vertex has to do in each superstep.VERTEX: The type of vertexId.
DATA: The type of user defined data.
MSG: The type of message sent to other vertices.
GMSG: The type of message used for global computation.
Now, we will give the SSSP implementation as an example:
class SSSPVertexComputer extends VertexComputer[Long, Unit, SSSPMessage, String] with Logging { override def compute(superStepId: Long, vertexData: VertexData[Long, Unit], inMessageList: Array[(Long, SSSPMessage)], globalMessage: Option[String], context: VertexContext[Long, Unit, SSSPMessage, String]): Unit = { globalMessage match { case Some(message) => context.writeNewVertexData(vertexData.voltToHalt) logInfo(message) return case _ => } if (vertexData.vertexId == 1L) { val message = SSSPMessage(1L, List(1L), 1) vertexData.connectedVertices.foreach { vertex => context.writeMessage(message, vertex) } } else if (vertexData.vertexId == 8L) { val message = inMessageList.head._2 val information = s"SSSP path: ${message.visited.mkString("->")}->8, with the length ${message.length + 1}" context.writeGlobalMessage(information) } else { if(!inMessageList.isEmpty) { val theMessage = inMessageList.reduce((m1, m2) => if (m1._2.length < m2._2.length) m1 else m2) vertexData.connectedVertices.foreach { connectedVertex => context.writeMessage(SSSPMessage(theMessage._2.startsFrom, theMessage._2.visited :+ vertexData.vertexId, theMessage._2.length + 1), connectedVertex) } } } context.writeNewVertexData(vertexData.voltToHalt) } }
-
Define your AggregatorComputer object, it has to implements the traits
AggregatorComputer[VERTEX, GMSG]
class SSSPAggregatorComputer extends AggregatorComputer[Long, String] { override def compute(superStepId: Long, inMessageList: Array[(Long, String)]): Option[(Long, String)] = ??
Now, we just use part of the SSSP implementation as an example.
class SSSPAggregatorComputer extends AggregatorComputer[Long, String] { override def compute(superStepId: Long, inMessageList: Array[(Long, String)]): Option[(Long, String)] = { if (inMessageList.isEmpty) { return None } else { return Some(inMessageList.head) } } }
-
Write a main class to submit the job.
GenericOptionsParser.parse(args) match { case None => { logError("Error") } case Some((conf: JobConf, otherArgs: Map[String, String])) => println(otherArgs.mkString("-")) val job = CatJob.createJob(conf, "SSSP Job", Some((new HashPartitionStrategy[Long]).getClass), Some((new SSSPGraphLoader).getClass), Some((new SSSPVertexComputer).getClass), Some((new SSSPAggregatorComputer).getClass)) CatJob.submitJob(job) }
You should use
CatJob.createJob
function to create the job. -
Pack your program into a Jar file and submit your job.
- Jiahui Jin ([email protected]), supervisor of this project, who gives much constructive advice on how to implement this project.
- Boyang Fan ([email protected]), artist of this project, who designs the impressive nutcat logo.
You can send me emails at: [[email protected]](mailto: [email protected])
Files
- Total: 57 files
- Scala: 56 files (98.2%)
- Java: 1 files (1.8%)
- Total size: 154,280 Bytes
- Avg size: 16,983 Bytes
- Avg length: 483 lines
Lines
- Total: 4,270 lines
- Code: 2,507 lines (58.7%)
- Comment: 1,183 lines (27.7%)
- Blank: 580 lines (13.6%)
- Bracket: 371 lines (8.7%)
Characters
- Total: 133,708 chars
- Code: 95,006 chars (71.1%)
- Comment: 38,702 chars (28.9%)