Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Code work by ZhangYinHan #1246

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public static PartitionGetResult partitionGet(PSContext psContext, PartitionGetP
MatrixMeta meta = psContext.getMatrixMetaManager().getMatrixMeta(param.getMatrixId());

try {
System.out.println("sssaaaa")
return new PartGeneralGetResult(meta.getValueClass(), nodeIds, data);
} catch (ClassNotFoundException e) {
throw new AngelException("Can not get value class ");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package struct2vec

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

object Alias_table {

def calcAliasTable(partId: Int, iter: Iterator[(Long, Array[(Long, Float)])]): Iterator[(Long, Array[Long], Array[Float], Array[Int])] = {
iter.map { case (src, neighbors) =>
val (events, weights) = neighbors.unzip
val weightsSum = weights.sum
val len = weights.length
val areaRatio = weights.map(_ / weightsSum * len)
val (accept, alias) = createAliasTable(areaRatio)
(src, events, accept, alias)
}
}

def createAliasTable(areaRatio: Array[Float]): (Array[Float], Array[Int]) = {
val len = areaRatio.length
val small = ArrayBuffer[Int]()
val large = ArrayBuffer[Int]()
val accept = Array.fill(len)(0f)
val alias = Array.fill(len)(0)

for (idx <- areaRatio.indices) {
if (areaRatio(idx) < 1.0) small.append(idx) else large.append(idx)
}
while (small.nonEmpty && large.nonEmpty) {
val smallIndex = small.remove(small.size - 1)
val largeIndex = large.remove(large.size - 1)
accept(smallIndex) = areaRatio(smallIndex)
alias(smallIndex) = largeIndex
areaRatio(largeIndex) = areaRatio(largeIndex) - (1 - areaRatio(smallIndex))
if (areaRatio(largeIndex) < 1.0) small.append(largeIndex) else large.append(largeIndex)
}
while (small.nonEmpty) {
val smallIndex = small.remove(small.size - 1)
accept(smallIndex) = 1
}

while (large.nonEmpty) {
val largeIndex = large.remove(large.size - 1)
accept(largeIndex) = 1
}
(accept, alias)
}

def alias_sample(rand:Random, accept: Array[Float], alias: Array[Int], sampleNum: Int): Array[Int] = {

val indices = new Array[Int](sampleNum)

for (i <- (0 until sampleNum)) {
val id = rand.nextInt(accept.length)
val v = rand.nextDouble().toFloat
if (v < accept(id)) {
indices(i) = id
} else {
indices(i) = alias(id)
}
}
indices
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package struct2vec

import struct2vec.Alias_table.alias_sample

import scala.collection.mutable.ListBuffer
import scala.util.Random

class BiasWalker (srcNodesArray: Array[Long]) {

private var idx2Node = srcNodesArray
private var idx: Array[Int] = Array(idx2Node.length)
for(i <- 0 to idx2Node.length) idx(i)=i

// def simulate_walks(num_walks: Int,walk_length: Int,stay_prob: Double =0.3) ={
//
//
// }

def _simulate_walks(nodes:Array[Int],num_walks: Int,walk_length: Int,stay_prob: Double,
layers_adj:Array[Array[Int]],layers_accept:Array[Array[Float]],
layers_alias:Array[Array[Float]],gamma:Array[Array[Float]]) ={
var walks = new Array[ListBuffer[Long]](num_walks)
Random.shuffle(nodes)
for( i <- 0 to num_walks ) {
for(node <- nodes)
walks(i) = exec_random_walk(layers_adj,layers_accept,layers_alias,node,walk_length,gamma,stay_prob)
}
walks
}

def exec_random_walk(graphs: Array[Array[Array[Int]]], layers_accept: Array[Array[Array[Float]]], layers_alias: Array[Array[Array[Int]]]
, node: Int, walk_length: Int, gamma: Array[Array[Float]], stay_prob: Double)={
var Initlayer: Int = 0
var layer: Int = 0
var path: ListBuffer[Long] = ListBuffer(idx.length)
path.append(idx2Node(node))

//同一层级
while (path.length < walk_length){
var rand = Random.nextFloat()
if (rand < stay_prob) {
node = ChooseNeigbor(node,graphs,layers_alias,layers_accept,layer)
path.append(idx2Node(node))
}else{ // 不同层级
var x = math.log(gamma(layer)(node))+1
var p_moveup = (x / (x+1)) // 升层级的概率

if((rand > p_moveup)&&(layer > Initlayer)) //降层级
layer = layer - 1
if ((graphs.contains(layer+1))&&(graphs(layer+1).contains(node))) //升层级
layer = layer + 1
}
path
}
def ChooseNeigbor(node: Int, graphs: Array[Array[Array[Int]]], layers_alias: Array[Array[Array[Int]]],
layers_accept: Array[Array[Array[Float]]], layer: Int) ={
val node_list = graphs(layer)(node)
val idx = alias_sample(Random,layers_accept(layer)(node),layers_alias(layer)(node),1)
//node_list(idx)
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package struct2vec

import fastdtwUtils.{ CostMatrix, IndexConstraints, MatrixEntry, PassthroughIndexConstraints, Space, TimeSeriesElement }

/**
* A wrapper for the cost matrix calculation based on a specified metric
*
* @param space
*/
class DTW[T](space: Space[T]) {

/**
* @param left
* @param right
* @param constraints
* @return
*/
def costMatrix(left: Seq[TimeSeriesElement[T]], right: Seq[TimeSeriesElement[T]], constraints: IndexConstraints = new PassthroughIndexConstraints): CostMatrix =
(left, right) match {
case (Nil, _) | (_, Nil) => //one of the series is empty
CostMatrix()
case _ => CostMatrix({

// COST MATRIX:
// 5|_|_|_|_|_|_|E| E = min Global Cost
// 4|_|_|_|_|_|_|_| S = Start point
// 3|_|_|_|_|_|_|_| each cell = min global cost to get to that point
// j 2|_|_|_|_|_|_|_|
// 1|_|_|_|_|_|_|_|
// 0|S|_|_|_|_|_|_|
// 0 1 2 3 4 5 6
// i
// access is M(i,j)... column-row

left.foldLeft(Seq[Seq[MatrixEntry]]()) { (columns, curLeft: TimeSeriesElement[T]) =>
val i = columns.length //this is the x-index
val (smallestIndex, _) = constraints.columnRange(i, right.length - 1)
val constrainedRight: Seq[TimeSeriesElement[T]] = constraints.mask(right, i)
columns :+ (
if (columns.isEmpty) {
//i = 0
//create a matrix entry for every j within the applied constraints
constrainedRight.foldLeft(Seq[MatrixEntry]()) { (column: Seq[MatrixEntry], curLeft: TimeSeriesElement[T]) =>
val lastDistance = column.lastOption.getOrElse(MatrixEntry(i -> 0, smallestIndex))
column :+ MatrixEntry(i -> (smallestIndex + column.length), lastDistance.value + space.distance(left.head.v, curLeft.v))
}
} else {
//i != 0
val lastColumn: Seq[MatrixEntry] = constraints.constrain(columns.last, i) //i-1, *
val bottomElement: MatrixEntry = MatrixEntry(i -> smallestIndex, lastColumn.head.value + space.distance(curLeft.v, constrainedRight.head.v))

val filledOptionalLastColumn = lastColumn.map { v => Option(v) } ++ Seq.fill(constrainedRight.length)(None)
val slidingValues: Seq[Seq[Option[MatrixEntry]]] = filledOptionalLastColumn.sliding(2).toIndexedSeq
val neighbors: Seq[(TimeSeriesElement[T], Seq[Option[MatrixEntry]])] = constrainedRight.drop(1).zip(slidingValues)

//neighbors is an element in the right series with the cost matrix entries for the West and Southwest directions
neighbors.foldLeft(Seq(bottomElement)) { (column: Seq[MatrixEntry], neighborhood: (TimeSeriesElement[T], Seq[Option[MatrixEntry]])) =>

val j = column.length + smallestIndex
val curRight = neighborhood._1
val neighboringValues: Seq[Option[MatrixEntry]] = neighborhood._2

val costSouth: Double = column.last.value //i, j-1
val minGlobalCost = (Seq(costSouth) ++ neighboringValues.flatten.map(_.value)).min

//add an entry to the current column (for curLeft) containing the total cost to consider curRight a match
column :+ MatrixEntry(i -> j, minGlobalCost + space.distance(curLeft.v, curRight.v))
}

})

}

}: _*)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package com.tencent.angel.graph.embedding.struct2vec

import com.tencent.angel.conf.AngelConf
import com.tencent.angel.graph.embedding.struct2vec.Struct2vec
import com.tencent.angel.graph.utils.GraphIO
import com.tencent.angel.spark.context.PSContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}

object Example {
def main(args: Array[String]): Unit = {
val input = "data/bc/karate_club_network.txt"
val storageLevel = StorageLevel.fromString("MEMORY_ONLY")
val batchSize = 10
val output = "data/output/output1"
val stay_prob: Double = 0.3
val opt1_reduce_len: Boolean = true
val opt2_reduce_sim_calc: Boolean = false
val opt3_num_layers:Int = 10
val max_num_layers:Int = 10
val srcIndex = 0
val dstIndex = 1
val weightIndex = 2
val psPartitionNum = 1
val partitionNum = 1
val useEdgeBalancePartition = false
val isWeighted = false
val needReplicateEdge =true

val sep = " "
val walkLength = 10


start()

val struct2vec = new Struct2vec()
.setStorageLevel(storageLevel)
.setPSPartitionNum(psPartitionNum)
.setSrcNodeIdCol("src")
.setDstNodeIdCol("dst")
.setWeightCol("weight")
.setBatchSize(batchSize)
.setWalkLength(walkLength)
.setPartitionNum(partitionNum)
.setIsWeighted(isWeighted)
.setNeedReplicaEdge(needReplicateEdge)
.setUseEdgeBalancePartition(useEdgeBalancePartition)
.setEpochNum(2)

struct2vec.setOutputDir(output)
val df = GraphIO.load(input, isWeighted = isWeighted, srcIndex, dstIndex, weightIndex, sep = sep)
val mapping = struct2vec.transform(df)

stop()
}

def start(mode: String = "local[4]"): Unit = {
val conf = new SparkConf()
conf.setMaster(mode)
conf.setAppName("Struct2vec")
conf.set(AngelConf.ANGEL_PSAGENT_UPDATE_SPLIT_ADAPTION_ENABLE, "false")
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
sc.setCheckpointDir("data/cp")
//PSContext.getOrCreate(sc)
}

def stop(): Unit = {
PSContext.stop()
SparkContext.getOrCreate().stop()
}

}

Loading