Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-42856][GRAPHX] Break tie with highest vertex id in label propagation #48871

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

awadhesh14
Copy link

What changes were proposed in this pull request?

Update the tie breaking logic in label propagation to consider the max vertex id.

Why are the changes needed?

This is a bug reported in SPARK-42856

Does this PR introduce any user-facing change?

No

How was this patch tested?

./dev/run-tests runs successfully

Was this patch authored or co-authored using generative AI tooling?

No

@zhengruifeng
Copy link
Contributor

shall we add a test for it?

@awadhesh14
Copy link
Author

@zhengruifeng I am looking into creating a unit test case.

Meanwhile Proposal: Using only active vertices for label propagation in each superstep.
Here are the proposed changed methods:

// Updates the vertex value only if it has changed.

def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]): VertexId = {
  val newAttr = if (message.isEmpty) attr else message.maxBy(_._2)._1
  if (newAttr == attr) {
    attr // Vertex value remains unchanged
  } else {
    newAttr // Vertex value has changed
  }
}
// Sends messages only if the source and destination vertex values are different.

def sendMessage(e: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, Map[VertexId, Long])] = {
  if (e.srcAttr != e.dstAttr) {
    Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
  } else {
    Iterator.empty
  }
}
// Adding `activeDirection = EdgeDirection.Either` to ensure only active vertices are processed.

val initialMessage = Map[VertexId, Long]()
Pregel(lpaGraph, initialMessage, maxIterations = maxSteps, activeDirection = EdgeDirection.Either)(
  vprog = vertexProgram,
  sendMsg = sendMessage,
  mergeMsg = mergeMessage)

@zhengruifeng Can you review it?

@zhengruifeng
Copy link
Contributor

@awadhesh14 The fix itself seems good: when there are multiple key have the same maximum value, the result is not deterministic.

I think we should add an end-to-end test in LabelPropagationSuite,

the test could be the mini-reproducer of the issue described in https://issues.apache.org/jira/browse/SPARK-42856, and we assert the results.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants