Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow user to remove broadcast variables when they are no longer used #771

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions core/src/main/scala/spark/broadcast/BitTorrentBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
def blockId: String = "broadcast_" + id

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
//Let BlockManagerMaster know that we have the broadcast block for its latter notification us to remove.
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a performance problem here.

Spark actually uses broadcast variable to broadcast the JobConf in HadoopRDD to avoid having that in the task closure (10KB). If every worker has to send a message back to the master, it might slow things down since every HadoopRDD we create will need to do that ...

Ideally, we should track the memory usage of broadcast variables, but I am not sure what the best way to do this is.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing my pull request, Reynold. I have followed your advice as blow.
1.Change the name of methods.
Answer: I have changed their names in the last commit as you suggested.

2.Performance issues for HadoopRDD.
Answer: This is a good concerning. In fact, the message size of a blockinfo transferred is no large. To make it flexible, I added a boolean parameter 'tellMaster (default is true)' for broadcast variables. When set false, the broadcast variable will not be reportted to master and not be removed at slave machines in a SparkContext, this is suitable for small-size broadcast variables. Users can choose the broadcast type at their wish.

3.Track the memory usages of broadcast.
Answer: This a good idea. It can lead making a automatic memory cleaner for broadcast variables. Nevertheless, the purpose of this patch is providing a removing broadcast API to users. These two things do not conflict in essence. For memory cleanup tasks, the lesson I learned is that, whatever program-monitoring mechanisms seems not better than clear the memory explicitly by users if possible. GC can not always be in time and it has overhead costs. Moreover, in this case, it is hard to determine whether a broadcast needed be used by users any more. On the other side, it is a issue to leave large unused broadcast variables in memory, and users have no means to handle that. Therefore, we provide a explicit removing broadcast method to users. It solves my problem in practise.

Thank you for commenting again and I will continue thinking about the automatic cleaner based on tracking memory usage you put forwarded here , it's interesting.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for reviewing my pull request, Reynold @rxin . I have followed your advice as blow.
1.Change the name of methods.
Answer: I have changed their names in the last commit as you suggested.

2.Performance issues for HadoopRDD.
Answer: This is a good concerning. In fact, the message size of a blockinfo transferred is no large. To make it flexible, I added a boolean parameter 'tellMaster (default is true)' for broadcast variables. When set false, the broadcast variable will not be reportted to master and not be removed at slave machines in a SparkContext, this is suitable for small-size broadcast variables. Users can choose the broadcast type at their wish.

3.Track the memory usages of broadcast.
Answer: This a good idea. It can lead making a automatic memory cleaner for broadcast variables. Nevertheless, the purpose of this patch is providing a removing broadcast API to users. These two things do not conflict in essence. For memory cleanup tasks, the lesson I learned is that, whatever program-monitoring mechanisms seems not better than clear the memory explicitly by users if possible. GC can not always be in time and it has overhead costs. Moreover, in this case, it is hard to determine whether a broadcast needed be used by users any more. On the other side, it is a issue to leave large unused broadcast variables in memory, and users have no means to handle that. Therefore, we provide a explicit removing broadcast method to users. It solves my problem in practise.

Thank you for commenting again and I will continue thinking about the automatic cleaner based on tracking memory usage you put forwarded here , it's interesting.

}

@transient var arrayOfBlocks: Array[BroadcastBlock] = null
Expand Down Expand Up @@ -58,6 +59,23 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
if (!isLocal) {
sendBroadcast()
}

override def rm(toClearSource: Boolean = false) {
logInfo("Remove broadcast variable " + blockId)
SparkEnv.get.blockManager.master.removeBlock(blockId)
SparkEnv.get.blockManager.removeBlock(blockId, false)
if(toClearSource)
clearBlockSource()
}

def clearBlockSource(){
arrayOfBlocks = null
hasBlocksBitVector = null
numCopiesSent = null
listOfSources = null
serveMR = null
guideMR = null
}

def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
Expand Down Expand Up @@ -116,7 +134,7 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
MultiTracker.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
SparkEnv.get.blockManager.getSingleLocal(blockId) match {
case Some(x) =>
value_ = x.asInstanceOf[T]

Expand All @@ -139,8 +157,9 @@ private[spark] class BitTorrentBroadcast[T](@transient var value_ : T, isLocal:
val receptionSucceeded = receiveBroadcast(id)
if (receptionSucceeded) {
value_ = MultiTracker.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
//Let BlockManagerMaster know that we have the broadcast block for its latter notification us to remove.
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
blockId, value_, StorageLevel.MEMORY_AND_DISK, true)
} else {
logError("Reading broadcast variable " + id + " failed")
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/spark/broadcast/Broadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ abstract class Broadcast[T](private[spark] val id: Long) extends Serializable {
// readObject having to be 'private' in sub-classes.

override def toString = "spark.Broadcast(" + id + ")"

// Remove a Broadcast blcok from the SparkContext and Executors that have it.
// Set isClearSource true to also remove the Broadcast value from its source.
def rm(toClearSource: Boolean)
}

private[spark]
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/spark/broadcast/HttpBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,24 +21,38 @@ extends Broadcast[T](id) with Logging with Serializable {
def blockId: String = "broadcast_" + id

HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
//Let BlockManagerMaster know that we have the broadcast block for its latter notification us to remove.
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, true)
}

if (!isLocal) {
HttpBroadcast.write(id, value_)
}

override def rm(toClearSource: Boolean = false) {
logInfo("Remove broadcast variable " + blockId)
SparkEnv.get.blockManager.master.removeBlock(blockId)
SparkEnv.get.blockManager.removeBlock(blockId, false)
if(toClearSource){
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a space after if

val path: String = HttpBroadcast.broadcastDir + "/" + "broadcast-" + id
HttpBroadcast.files.internalMap.remove(path)
new File(path).delete()
logInfo("Deleted source broadcast file '" + path + "'")
}
}

// Called by JVM when deserializing an object
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
SparkEnv.get.blockManager.getSingleLocal(blockId) match {
case Some(x) => value_ = x.asInstanceOf[T]
case None => {
logInfo("Started reading broadcast variable " + id)
val start = System.nanoTime
value_ = HttpBroadcast.read[T](id)
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
//Let BlockManagerMaster know that we have the broadcast block for its latter notification us to remove.
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, true)
val time = (System.nanoTime - start) / 1e9
logInfo("Reading broadcast variable " + id + " took " + time + " s")
}
Expand Down
20 changes: 18 additions & 2 deletions core/src/main/scala/spark/broadcast/TreeBroadcast.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ extends Broadcast[T](id) with Logging with Serializable {
def blockId = "broadcast_" + id

MultiTracker.synchronized {
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, false)
//Let BlockManagerMaster know that we have the broadcast block for its latter notification us to remove.
SparkEnv.get.blockManager.putSingle(blockId, value_, StorageLevel.MEMORY_AND_DISK, true)
}

@transient var arrayOfBlocks: Array[BroadcastBlock] = null
Expand Down Expand Up @@ -46,6 +47,21 @@ extends Broadcast[T](id) with Logging with Serializable {
if (!isLocal) {
sendBroadcast()
}

override def rm(toClearSource: Boolean = false) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this function to remove, and toClearSource to releaseSource?

logInfo("Remove broadcast variable " + blockId)
SparkEnv.get.blockManager.master.removeBlock(blockId)
SparkEnv.get.blockManager.removeBlock(blockId, false)
if(toClearSource)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use

if (toClearSource) {
  clearBlockSource()
}

clearBlockSource()
}

def clearBlockSource(){
arrayOfBlocks = null
listOfSources = null
serveMR = null
guideMR = null
}

def sendBroadcast() {
logInfo("Local host address: " + hostAddress)
Expand Down Expand Up @@ -92,7 +108,7 @@ extends Broadcast[T](id) with Logging with Serializable {
private def readObject(in: ObjectInputStream) {
in.defaultReadObject()
MultiTracker.synchronized {
SparkEnv.get.blockManager.getSingle(blockId) match {
SparkEnv.get.blockManager.getSingleLocal(blockId) match {
case Some(x) =>
value_ = x.asInstanceOf[T]

Expand Down
7 changes: 7 additions & 0 deletions core/src/main/scala/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -772,6 +772,13 @@ private[spark] class BlockManager(
def getSingle(blockId: String): Option[Any] = {
get(blockId).map(_.next())
}

/**
* Read a block consisting of a single object only from local BlockManager.
*/
def getSingleLocal(blockId: String): Option[Any] = {
getLocal(blockId).map(_.next())
}

/**
* Write a block consisting of a single object.
Expand Down
1 change: 1 addition & 0 deletions examples/src/main/scala/spark/examples/BroadcastTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ object BroadcastTest {
sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
barr1.rm(true)
}

System.exit(0)
Expand Down