diff --git a/core/src/main/scala/spark/RDD.scala b/core/src/main/scala/spark/RDD.scala index f336c2ea1e..c1636e25a4 100644 --- a/core/src/main/scala/spark/RDD.scala +++ b/core/src/main/scala/spark/RDD.scala @@ -634,14 +634,21 @@ abstract class RDD[T: ClassManifest]( * allocation. */ def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = { - // Clone the zero value since we will also be serializing it as part of tasks - var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) + var jobResult = Option.empty[U] val cleanSeqOp = sc.clean(seqOp) val cleanCombOp = sc.clean(combOp) - val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) - val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) + def optCombOp(a: Option[U], b: Option[U]): Option[U] = (a, b) match { + case (None, None) => Option(zeroValue) + case (None, _) => b + case (Some(u1), Some(u2)) => Option(combOp(u1, u2)) + case (_, _) => throw new SparkException("Have a jobResult but no taskResult in aggregate()") + } + val aggregatePartition = + (it: Iterator[T]) => Option(it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)) + val mergeResult = + (index: Int, taskResult: Option[U]) => jobResult = optCombOp(jobResult, taskResult) sc.runJob(this, aggregatePartition, mergeResult) - jobResult + jobResult.get } /**