diff --git a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala index 507c23b..e4548b7 100644 --- a/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala +++ b/src/main/scala/com/qubole/sparklens/QuboleJobListener.scala @@ -20,12 +20,14 @@ package com.qubole.sparklens import java.net.URI import com.qubole.sparklens.analyzer._ +import com.qubole.sparklens.autoscaling.{AutoscalingPolicy} import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkConf import org.apache.spark.scheduler._ +import org.slf4j.LoggerFactory import scala.collection.mutable import scala.collection.mutable.ListBuffer @@ -47,6 +49,9 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { protected val stageIDToJobID = new mutable.HashMap[Int, Long] protected val failedStages = new ListBuffer[String] protected val appMetrics = new AggregateMetrics() + private var autoscalingPolicy: Option[AutoscalingPolicy] = None + private val log = LoggerFactory.getLogger(classOf[QuboleJobListener]) + private def hostCount():Int = hostMap.size @@ -141,10 +146,22 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = { //println(s"Application ${applicationStart.appId} started at ${applicationStart.time}") + autoscalingPolicy = getAutoScalingPolicy() appInfo.applicationID = applicationStart.appId.getOrElse("NA") appInfo.startTime = applicationStart.time } + def getAutoScalingPolicy(): Option[AutoscalingPolicy] = { + AutoscalingPolicy.init(sparkConf) match { + case Some(autoscalingSparklensClient) => + log.info(s"Autoscaling client for sparklens exists = ${autoscalingSparklensClient}," + + s" will generate sparklens autoscaling policy") + Some(new AutoscalingPolicy(autoscalingSparklensClient, sparkConf)) + case None => + None + } + } + override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { stageMap.map(x => x._2).foreach( x => x.tempTaskTimes.clear()) //println(s"Application ${appInfo.applicationID} ended at ${applicationEnd.time}") @@ -170,6 +187,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { } } override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + autoscalingPolicy.map(_.onExecutorAdded(executorAdded)) val executorTimeSpan = executorMap.get(executorAdded.executorId) if (!executorTimeSpan.isDefined) { val timeSpan = new ExecutorTimeSpan(executorAdded.executorId, @@ -187,12 +205,14 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { } override def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + autoscalingPolicy.map(_.onExecutorRemoved(executorRemoved)) val executorTimeSpan = executorMap(executorRemoved.executorId) executorTimeSpan.setEndTime(executorRemoved.time) //We don't get any event for host. Will not try to check when the hosts go out of service } override def onJobStart(jobStart: SparkListenerJobStart) { + autoscalingPolicy.map(_.scale(jobStart)) val jobTimeSpan = new JobTimeSpan(jobStart.jobId) jobTimeSpan.setStartTime(jobStart.time) jobMap(jobStart.jobId) = jobTimeSpan @@ -202,6 +222,7 @@ class QuboleJobListener(sparkConf: SparkConf) extends SparkListener { } override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = { + autoscalingPolicy.map(_.scale(jobEnd)) val jobTimeSpan = jobMap(jobEnd.jobId) jobTimeSpan.setEndTime(jobEnd.time) //if we miss cleaing up tasks at end of stage, clean them after end of job diff --git a/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala index 9112bc7..e6a6598 100644 --- a/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala +++ b/src/main/scala/com/qubole/sparklens/analyzer/AppAnalyzer.scala @@ -20,6 +20,7 @@ import java.util.Date import java.util.concurrent.TimeUnit import com.qubole.sparklens.common.AppContext +import org.apache.spark.SparkConf import scala.collection.mutable.ListBuffer @@ -84,6 +85,7 @@ object AppAnalyzer { list += new EfficiencyStatisticsAnalyzer list += new ExecutorWallclockAnalyzer list += new StageSkewAnalyzer + list += new AutoscaleAnalyzer list.foreach( x => { diff --git a/src/main/scala/com/qubole/sparklens/analyzer/AutoscaleAnalyzer.scala b/src/main/scala/com/qubole/sparklens/analyzer/AutoscaleAnalyzer.scala new file mode 100644 index 0000000..52329f6 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/analyzer/AutoscaleAnalyzer.scala @@ -0,0 +1,78 @@ +package com.qubole.sparklens.analyzer +import com.qubole +import com.qubole.sparklens.autoscaling.{AutoscalingPolicy} +import com.qubole.sparklens.chart.{Graph, Point} +import com.qubole.sparklens.common.AppContext +import org.apache.spark.SparkConf + +class AutoscaleAnalyzer(conf: SparkConf = new SparkConf) extends AppAnalyzer { + val random = scala.util.Random + + override def analyze(ac: AppContext, startTime: Long, endTime: Long): String = { + println("================== AutoScale Analyze ==================") + val dimensions = qubole.sparklens.autoscaleGraphDimensions(conf) + val coresPerExecutor = ac.executorMap.values.map(x => x.cores).sum / ac.executorMap.size + println(s"cores per executor = ${coresPerExecutor}") + val originalGraph = createGraphs(dimensions, ac, coresPerExecutor) + + "" + } + + private def createGraphs(dimensions: List[Int], ac: AppContext, coresPerExecutor: Int): Graph = { + + val graph = new Graph(dimensions.head, dimensions.last) + createActualExecutorGraph(ac, graph, 'o') + createIdealPerJob(ac, graph, '*', coresPerExecutor) + + val realDuration = ac.appInfo.endTime - ac.appInfo.startTime + println(s"\n\nTotal app duration = ${pd(realDuration)}") + + println(s"Maximum concurrent executors = ${graph.getMaxY()}") + println(s"coresPerExecutor = ${coresPerExecutor}") + println(s"\n\nIndex:\noooooo --> Actual number of executors") + println("****** --> Ideal number of executors which would give same timelines") + + graph.plot('o', '*') + graph + } + + private def createActualExecutorGraph(appContext: AppContext, graph: Graph, graphIndex: Char) + : Unit = { + val sorted = AppContext.getSortedMap(appContext.executorMap, appContext) + graph.addPoint(Point(appContext.appInfo.startTime, 0, graphIndex)) // start point + var count: Int = 0 + sorted.map(x => { + count += x._2.asInstanceOf[Int] + graph.addPoint(Point(x._1, count, graphIndex)) + }) + graph.addPoint(Point(appContext.appInfo.endTime, 0, graphIndex)) + } + + private def createIdealPerJob(ac: AppContext, graph: Graph, graphIndex: Char, + coresPerExecutor: Int): Unit = { + val maxConcurrentExecutors = AppContext.getMaxConcurrent(ac.executorMap, ac) + + graph.addPoint(Point(ac.appInfo.startTime, 0, graphIndex)) + var lastJobEndTime = ac.appInfo.startTime + + ac.jobMap.values.toSeq.sortWith(_.startTime < _.startTime).foreach(jobTimeSpan => { + val optimalExecutors = jobTimeSpan.optimumNumExecutorsForJob(coresPerExecutor, + maxConcurrentExecutors.asInstanceOf[Int]) + + // first driver time when no jobs have run + if (lastJobEndTime == ac.appInfo.startTime) graph.addPoint(Point(jobTimeSpan.startTime, 0, + graphIndex)) + + // If time gap between this job and last job is large + if (jobTimeSpan.startTime - lastJobEndTime > AutoscalingPolicy.releaseTimeout) { + graph.addPoint(Point(lastJobEndTime, 0, graphIndex)) + } + + graph.addPoint(Point(jobTimeSpan.startTime, optimalExecutors, graphIndex)) + graph.addPoint(Point(jobTimeSpan.endTime, optimalExecutors, graphIndex)) + lastJobEndTime = jobTimeSpan.endTime + }) + graph.addPoint(Point(lastJobEndTime, 0, graphIndex)) + graph.addPoint(Point(ac.appInfo.endTime, 0, graphIndex)) + } +} diff --git a/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala b/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala index 19171b1..3d31496 100644 --- a/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala +++ b/src/main/scala/com/qubole/sparklens/app/ReporterApp.scala @@ -71,15 +71,19 @@ object ReporterApp extends App { } private def reportFromSparklensDump(file: String): Unit = { + val json = readSparklenDump(file) + startAnalysersFromString(json) + + } + + def readSparklenDump(file: String): String = { val fs = FileSystem.get(new URI(file), new Configuration()) val path = new Path(file) val byteArray = new Array[Byte](fs.getFileStatus(path).getLen.toInt) fs.open(path).readFully(byteArray) - val json = (byteArray.map(_.toChar)).mkString - startAnalysersFromString(json) - + (byteArray.map(_.toChar)).mkString } def reportFromEventHistory(file: String): Unit = { diff --git a/src/main/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicy.scala b/src/main/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicy.scala new file mode 100644 index 0000000..2adeda1 --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicy.scala @@ -0,0 +1,257 @@ +package com.qubole.sparklens.autoscaling + +import java.util.concurrent.atomic.AtomicBoolean + +import com.qubole.sparklens +import com.qubole.sparklens.app.ReporterApp +import com.qubole.sparklens.common.AppContext +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.autoscaling.AutoscalingSparklensClient +import org.apache.spark.externalautoscaling.ExecutorAllocator +import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart} +import org.json4s.DefaultFormats +import org.json4s.JsonAST.JValue +import org.json4s.jackson.JsonMethods.parse +import org.slf4j.LoggerFactory + +import scala.collection.mutable +class AutoscalingPolicy(autoscalingClient: AutoscalingSparklensClient, sparkConf: SparkConf, + unitTestAppContext: Option[AppContext] = None) { + + import AutoscalingPolicy.log + private val map = init + + // always taking lock on currentExecutors for any action on add/remove/request executor + private val currentExecutors: mutable.Set[String] = new mutable.HashSet[String]() + private val execsToBeReleased: mutable.Set[String] = new mutable.HashSet[String]() + + @volatile private var initalized: AtomicBoolean = new AtomicBoolean(false) + @volatile private var lastNumExecutorsRequested: Int = + getDynamicAllocationInitialExecutors(sparkConf) + + + /** The following functions getDynamicAllocationInitialExecutors, + * getDynamicAllocationMinExecutors and + * getDynamicAllocationMaxExecutors + * have been copied from Qubole Spark repo's Utils.scala + */ + + /** + * Return the initial number of executors for dynamic allocation. + */ + def getDynamicAllocationInitialExecutors(conf: SparkConf): Int = { + conf.getInt("spark.dynamicAllocation.initialExecutors", + getDynamicAllocationMinExecutors(conf)) + } + + /** + * Return the minimum number of executors for dynamic allocation. + */ + def getDynamicAllocationMinExecutors(conf: SparkConf): Int = { + conf.getInt("spark.dynamicAllocation.minExecutors", + conf.getInt("spark.executor.instances", 0)) + } + + /** + * Return the maximum number of executors for dynamic allocation. + */ + def getDynamicAllocationMaxExecutors(conf: SparkConf): Int = { + val defaultMaxExecutors = conf.getInt("spark.qubole.internal.default.maxExecutors", + 2) + val maxExecutors = conf.getInt("spark.dynamicAllocation.maxExecutors", + conf.getInt("spark.qubole.max.executors", defaultMaxExecutors)) + maxExecutors.max(getDynamicAllocationMinExecutors(conf)) + } + + + // maybe this needs to be delayed till SparkContext has come up + private def init: Map[Int, Int] = { + log.debug("Init for sparklens autoscaling policy") + val olderAppContext = unitTestAppContext match { + case Some(context) => context + case None => getAppContext(sparkConf) match { + case Some(appContext) => + log.debug(s"Got appContext for previous run ${appContext}") + appContext + case None => + log.warn("Could not find any previous appContext for sparklens autoscaling.") + return Map.empty + } + } + + val coresPerExecutor = olderAppContext.executorMap.values.map(x => x.cores).sum / + olderAppContext.executorMap.size + createAutoscaleExec(olderAppContext, coresPerExecutor) + } + + private def getAppContext(conf: SparkConf): Option[AppContext] = { + sparklens.sparklensPreviousDump(conf) match { + case Some(file) => + val json = ReporterApp.readSparklenDump(file) + implicit val formats = DefaultFormats + val map = parse(json).extract[JValue] + Some(AppContext.getContext(map)) + case _ => None + } + } + + private def createAutoscaleExec(ac: AppContext, coresPerExecutor: Int): Map[Int, Int] = { + log.debug("Creating autoscaling policy based on previous run") + val maxConcurrentExecutors = AppContext.getMaxConcurrent(ac.executorMap, ac) + + ac.jobMap.values.map(jobTimeSpan => { + val optimalExecutors = jobTimeSpan.optimumNumExecutorsForJob(coresPerExecutor, + maxConcurrentExecutors.asInstanceOf[Int]) + log.debug(s"Executors required for job = ${jobTimeSpan.jobID} = ${optimalExecutors}") + jobTimeSpan.jobID.asInstanceOf[Int] -> optimalExecutors + }).toMap + } + + def scale(jobStart: SparkListenerJobStart): Unit = { + scale(jobStart.jobId, true) + } + + def scale(jobEnd: SparkListenerJobEnd): Unit = { + scale(jobEnd.jobId, false) + } + + def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = { + log.debug(s"Adding executor ${executorAdded.executorId}") + currentExecutors.synchronized { + currentExecutors.add(executorAdded.executorId) + } + // a new executor could come due to an old request which might not be needed now. So + // try to remove extra executors + scale(lastNumExecutorsRequested) + } + + def onExecutorRemoved(executorRemoved: SparkListenerExecutorRemoved): Unit = { + currentExecutors.synchronized { + if (!currentExecutors.contains(executorRemoved.executorId)) { + log.warn(s"Strange that currentExecutors does not have ${executorRemoved.executorId}.") + } + currentExecutors.remove(executorRemoved.executorId) + if (!execsToBeReleased.contains(executorRemoved.executorId)) { + log.warn("Exec to be release did not contain the executor. Someone else asked for " + + s"removal of this executor ${executorRemoved.executorId}") + } + log.debug(s"removing executor ${executorRemoved.executorId}") + execsToBeReleased.remove(executorRemoved.executorId) + } + } + + private def scale(jobId: Int, jobStart: Boolean): Unit = { + log.debug(s"scale called for jobId = ${jobId}, job started = ${jobStart}") + map.get(jobId) match { + case Some(numExecs) => + currentExecutors.synchronized { + jobStart match { + case true => + initalized.getAndSet(true) match { // initial num of executors + case false => + log.debug(s"Reset: Either starting, or the last job completed was more than " + + s"${AutoscalingPolicy.releaseTimeout}ms before. Scaling to ${numExecs} " + + s"executors specifically for job: ${jobId}") + scale(numExecs) + case true => + log.debug(s"Adding ${numExecs} for job ${jobId} to already existing " + + s"${lastNumExecutorsRequested}") + scale(lastNumExecutorsRequested + numExecs) + } + case false => + (lastNumExecutorsRequested - numExecs) match { + case 0 => // downscale only after release timeout + log.debug(s"Job ${jobId} complete, will scale down to 0 only after " + + s"${AutoscalingPolicy.releaseTimeout}ms if no other jobs would come in that " + + s"time.") + initalized.getAndSet(false) + new Thread(new Runnable { + override def run(): Unit = { + Thread.sleep(AutoscalingPolicy.releaseTimeout) + currentExecutors.synchronized { + if (initalized.get() == false) { + if (lastNumExecutorsRequested == numExecs) scale(0) // Still no jobs have come + } + } + } + }).start() + case _ => + log.debug(s"downscaling by ${numExecs} from current asked " + + s"${lastNumExecutorsRequested} for job ${jobId}") + scale(lastNumExecutorsRequested - numExecs) // parallel jobs, downscale + } + } + } + case _ => log.debug("map was empty while trying to scale") + } + } + + private def scale(numExecs: Int): Unit = { + log.debug(s"Last executors requested = ${lastNumExecutorsRequested}, and current request = " + + s"${numExecs}") + if (lastNumExecutorsRequested >= numExecs) { + currentExecutors.synchronized { + val toRemove = (currentExecutors.size - execsToBeReleased.size) - numExecs + val execsToRemove = (currentExecutors -- execsToBeReleased).take(toRemove).toSeq + log.debug( + s"""Current executors = ${currentExecutors}. Size = ${currentExecutors.size} + | Executors waiting to be released = ${execsToBeReleased}. Size = + | ${execsToBeReleased.size}. + | Number of executors to be removed now = ${toRemove}. + | Executors chosen to be removed = ${execsToRemove} + """.stripMargin) + execsToRemove.foreach(execsToBeReleased.add(_)) + if (!execsToRemove.isEmpty) autoscalingClient.killExecutors(execsToRemove) + } + } + if (lastNumExecutorsRequested != numExecs) { + log.info(s"Asking autosclaling client to scale to ${numExecs} from previous " + + s"${lastNumExecutorsRequested}") + lastNumExecutorsRequested = numExecs + + // upscale only to a max number + val finalNum = Math.min(numExecs, getDynamicAllocationMaxExecutors(sparkConf)) + if (finalNum < numExecs) { + log.info(s"Although ${numExecs} number of executors requested, but limiting the request " + + s"to a maximum of ${finalNum} configured") + } + autoscalingClient.requestTotalExecutors(finalNum) + } + } +} + +object AutoscalingPolicy { + var releaseTimeout = 2 * 60 * 1000 // time between 2 jobs to release all resources + val log = LoggerFactory.getLogger(classOf[AutoscalingPolicy]) + + def init(sparkConf: SparkConf): Option[AutoscalingSparklensClient] = { + val spark = try { + SparkContext.getOrCreate(sparkConf) + } catch { // while replaying from event-history, SparkContext would not be present. + case _: Throwable => return None + } + + val klass = spark.getClass + + // this relection will be removed once OSS spark has pluggable autoscaling + val executorAllocatorMethod = try { + klass.getMethod("executorAllocator") + } catch { + case ne: NoSuchMethodException => + log.warn("Could not get executorAllocator from SparkContext. Will not be able to perform " + + "sparklens autoscaling") + return None + } + executorAllocatorMethod.invoke(spark).asInstanceOf[Option[ExecutorAllocator]] match { + case Some(client) if client.isInstanceOf[AutoscalingSparklensClient] => + Some(client.asInstanceOf[AutoscalingSparklensClient]) + case _ => + log.warn("Could not cast ExecutorAllocator to AutoscalingSparklensClient. Will not be " + + "able to perform sparklens autoscaling.") + None + } + } + def setTimeoutForUnitTest(timeout: Int): Unit = { + releaseTimeout = timeout + } +} diff --git a/src/main/scala/com/qubole/sparklens/chart/Graph.scala b/src/main/scala/com/qubole/sparklens/chart/Graph.scala new file mode 100644 index 0000000..f0b33cc --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/chart/Graph.scala @@ -0,0 +1,155 @@ +package com.qubole.sparklens.chart + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +class Graph(width: Int = 100, length: Int = 30) { + + private val compressedPointsSet = mutable.HashMap[Char, mutable.ListBuffer[Point]]() + private val pointsSet = mutable.HashMap[Char, mutable.ListBuffer[Point]]() + + var graphString = mutable.ListBuffer[StringBuilder]() + + private def init = { + // 10 % buffer + (1 to length + (length / 10 ) ).foreach(_ => { + val builder = new StringBuilder() + (1 to width + (width / 10)).foreach(_ => builder.append(" ")) + graphString.append(builder) + }) + } + + init + + def addPoint(p: Point) = { + val list = if (pointsSet.get(p.c).isDefined) { + pointsSet(p.c) + } else new mutable.ListBuffer[Point]() + list.append(p) + pointsSet.put(p.c, list) + } + + + def plot(original: Char = 'o', simulated: Char = '*'): Unit = { + // find max of all graphs + val maxY = getMaxY() + + pointsSet.keys.foreach(c => { + fixYLimit(c, maxY) + plot(c) + }) + var topFlag = true + + //markOptimizationSpace(original, simulated) + + graphString.reverse.foreach(x => { + topFlag match { + case false => print(" | ") + case true => + print(" ^ ") + topFlag = false + } + println(x) + }) + + (1 to (width / 2)).foreach(_ => print("-")) + print(" time ") + (1 to (width / 2)).foreach(_ => print("-")) + println(">") + + } + + def getMaxY(): Int = { + pointsSet.values.map(_.maxBy(_.y)).maxBy(_.y).y + } + + def getMaxForChar(c: Char): Int = { + pointsSet(c).map(_.y).max + } + + private def plot(c: Char): Unit = { + val widestGraph = compressedPointsSet.values.toSeq.maxBy(x => x.last.x - x.head.x) + + val xJump = (widestGraph.last.x - widestGraph.head.x) / width + xJump match { + case 0 => // don't bother of spark-apps lasting less than *length* millisecond + case _ => largerGraph(compressedPointsSet(c), xJump) + } + } + + + private def fixYLimit(c: Char, maxY: Int) { + + val points = pointsSet(c) + + maxY match { + case large if large >= length => { + + val newPoints = mutable.ListBuffer[Point]() + + points.foreach(p => { + val newY: Int = (length * p.y) / maxY + //println(s"changing ${p.y} to ${newY}") + newPoints.append(Point(p.x, newY, p.c)) + }) + compressedPointsSet.put(c, newPoints) + } + case _ => compressedPointsSet.put(c, points) + } + } + + + private def largerGraph(sortedPoints: ListBuffer[Point], xJump: Long): Unit = { + val qu = scala.collection.mutable.Queue.empty[Point] + sortedPoints.foreach(qu.enqueue(_)) + //val qu = scala.collection.immutable.Queue(sortedPoints: _*) + + var index = 0 + var lastPoint: Point = null + var start = sortedPoints.head.x + var nextTarget = start + (index * xJump) + + while (!qu.isEmpty) { + val front = qu.dequeue + + // if nextTarget is still far, update the current index + if (nextTarget > front.x || lastPoint == null) { + //fill the graph + if (lastPoint == null) { + graphString(front.y).setCharAt(index, front.c) + } else { + (lastPoint.y + 1 to front.y).foreach(y => graphString(y).setCharAt(index, front.c)) + (front.y to lastPoint.y - 1).foreach(y => graphString(y).setCharAt(index, front.c)) + } + + } else { + val ep = extrapolationPoints(front.x, nextTarget, xJump) + val yJumps = (front.y - lastPoint.y) / ep + var lastY = lastPoint.y + (1 to ep).foreach(i => { + val y = if (i == ep) front.y + else lastPoint.y + (i * yJumps) + // fill the graph + (lastY + 1 to y).foreach(interimY => graphString(interimY).setCharAt(index, front.c)) + (y to lastY - 1).foreach(interimY => graphString(interimY).setCharAt(index, front.c)) + graphString(y).setCharAt(index, front.c) + lastY = y + index += 1 + }) + } + lastPoint = front + nextTarget = start + (index * xJump) + } + } + + private def extrapolationPoints(xPoint: Long, target: Long, xJump: Long): Int = { + var tmp = target + var jumps = 1 + while (tmp + xJump <= xPoint) { + tmp += xJump + jumps += 1 + } + jumps + } + +} diff --git a/src/main/scala/com/qubole/sparklens/chart/Point.scala b/src/main/scala/com/qubole/sparklens/chart/Point.scala new file mode 100644 index 0000000..dbea4dc --- /dev/null +++ b/src/main/scala/com/qubole/sparklens/chart/Point.scala @@ -0,0 +1,3 @@ +package com.qubole.sparklens.chart + +case class Point(x: Long, y: Int, c: Character) diff --git a/src/main/scala/com/qubole/sparklens/common/AppContext.scala b/src/main/scala/com/qubole/sparklens/common/AppContext.scala index 29f2e64..56da39b 100644 --- a/src/main/scala/com/qubole/sparklens/common/AppContext.scala +++ b/src/main/scala/com/qubole/sparklens/common/AppContext.scala @@ -66,11 +66,26 @@ case class AppContext(appInfo: ApplicationInfo, object AppContext { - def getMaxConcurrent[Span <: TimeSpan](map: mutable.HashMap[String, Span], + def getMaxConcurrent[Span <: TimeSpan, T <: Any](map: mutable.HashMap[T, Span], appContext: AppContext = null): Long = { // sort all start and end times on basis of timing - val sorted = map.values.flatMap(timeSpan => { + val sorted = getSortedMap(map, appContext) + + var count = 0L + var maxConcurrent = 0L + + sorted.foreach(tuple => { + count = count + tuple._2 + maxConcurrent = math.max(maxConcurrent, count) + }) + maxConcurrent + } + + def getSortedMap[Span <: TimeSpan, T <: Any](map: mutable.HashMap[T, Span], + appContext: AppContext = null): Array[(Long, Long)] = { + // sort all start and end times on basis of timing + map.values.flatMap(timeSpan => { val correctedEndTime = if (timeSpan.endTime == 0) { if (appContext == null) { System.currentTimeMillis() @@ -84,15 +99,6 @@ object AppContext { t1._2 > t2._2 } else t1._1 < t2._1 }) - - var count = 0L - var maxConcurrent = 0L - - sorted.foreach(tuple => { - count = count + tuple._2 - maxConcurrent = math.max(maxConcurrent, count) - }) - maxConcurrent } def getMap[T](map: mutable.HashMap[T, _ <: TimeSpan]): Map[String, Any] = { diff --git a/src/main/scala/com/qubole/sparklens/package.scala b/src/main/scala/com/qubole/sparklens/package.scala index 9320524..ee18265 100644 --- a/src/main/scala/com/qubole/sparklens/package.scala +++ b/src/main/scala/com/qubole/sparklens/package.scala @@ -17,4 +17,15 @@ package object sparklens { /* Even if reporting is in app, we can still dump sparklens data which could be used later */ conf.getBoolean("spark.sparklens.save.data", true) } + def sparklensPreviousDump(conf: SparkConf): Option[String] = { + conf.getOption("spark.sparklens.previous.data.file") + } + + // Width and length of autoscale graph + private[qubole] def autoscaleGraphDimensions(conf: SparkConf): List[Int] = { + conf.getOption("spark.sparklens.autoscale.graph.dimensions") match { + case Some(dimensions) => dimensions.split("x").toList.map(_.toInt) + case _ => List(100, 30) + } + } } diff --git a/src/main/scala/com/qubole/sparklens/timespan/JobTimeSpan.scala b/src/main/scala/com/qubole/sparklens/timespan/JobTimeSpan.scala index 1bb27c2..da2103e 100644 --- a/src/main/scala/com/qubole/sparklens/timespan/JobTimeSpan.scala +++ b/src/main/scala/com/qubole/sparklens/timespan/JobTimeSpan.scala @@ -18,6 +18,7 @@ package com.qubole.sparklens.timespan import com.qubole.sparklens.common.{AggregateMetrics, AppContext} +import com.qubole.sparklens.scheduler.CompletionEstimator import org.apache.spark.executor.TaskMetrics import org.apache.spark.scheduler.TaskInfo import org.json4s.DefaultFormats @@ -90,6 +91,23 @@ class JobTimeSpan(val jobID: Long) extends TimeSpan { "jobMetrics" -> jobMetrics.getMap, "stageMap" -> AppContext.getMap(stageMap)) ++ super.getStartEndTime() } + + def optimumNumExecutorsForJob(coresPerExecutor: Int, maxConcurrent: Int): Int = { + val realTime = this.endTime - this.startTime + var high = maxConcurrent + var low = 1 + while(high != low) { + val mid = (low + high) / 2 + + val simulationTime = CompletionEstimator.estimateJobWallClockTime(this, mid, coresPerExecutor) + if (simulationTime <= realTime) { + high = mid + } else low = mid + 1 + } + low + } + + } object JobTimeSpan { diff --git a/src/main/scala/org/apache/spark/autoscaling/AutoscalingSparklensClient.scala b/src/main/scala/org/apache/spark/autoscaling/AutoscalingSparklensClient.scala new file mode 100644 index 0000000..61b8dc3 --- /dev/null +++ b/src/main/scala/org/apache/spark/autoscaling/AutoscalingSparklensClient.scala @@ -0,0 +1,27 @@ +package org.apache.spark.autoscaling + +import org.apache.spark.ExecutorAllocationClient +import org.apache.spark.externalautoscaling.ExecutorAllocator + +class AutoscalingSparklensClient(client: ExecutorAllocationClient) extends ExecutorAllocator { + + override def start(): Unit = {} + + override def stop(): Unit = {} + + def requestTotalExecutors(num: Int): Unit = { + client.requestTotalExecutors(num, 0, Map.empty) + } + + def killExecutors(execs: Seq[String]): Unit = { + try { + client.killExecutors(execs) + } catch { + case nsme: NoSuchMethodError => // not spark-2.0.0, using only for spark-2.3 + val method = client.getClass.getMethod("killExecutors", classOf[Seq[String]], + classOf[Boolean], classOf[Boolean], classOf[Boolean]) + method.invoke(client, execs, + boolean2Boolean(true), boolean2Boolean(false), boolean2Boolean(true)) + } + } +} \ No newline at end of file diff --git a/src/main/scala/org/apache/spark/externalautoscaling/ExecutorAllocator.scala b/src/main/scala/org/apache/spark/externalautoscaling/ExecutorAllocator.scala new file mode 100644 index 0000000..10d9086 --- /dev/null +++ b/src/main/scala/org/apache/spark/externalautoscaling/ExecutorAllocator.scala @@ -0,0 +1,8 @@ +package org.apache.spark.externalautoscaling + +trait ExecutorAllocator { + + def start(): Unit + + def stop(): Unit +} diff --git a/src/test/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicySuite.scala b/src/test/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicySuite.scala new file mode 100644 index 0000000..946b1fc --- /dev/null +++ b/src/test/scala/com/qubole/sparklens/autoscaling/AutoscalingPolicySuite.scala @@ -0,0 +1,250 @@ +package com.qubole.sparklens.autoscaling + +import com.qubole.sparklens.common.{AggregateMetrics, AppContext, ApplicationInfo} +import com.qubole.sparklens.timespan.{ExecutorTimeSpan, HostTimeSpan, JobTimeSpan, StageTimeSpan} +import org.apache.spark.{DummyExecutorAllocator, SparkConf} +import org.apache.spark.autoscaling.DummyAutoscalingSparklensClient +import org.apache.spark.scheduler.{SparkListenerExecutorAdded, SparkListenerExecutorRemoved, SparkListenerJobEnd, SparkListenerJobStart} +import org.scalatest.FunSuite + +import scala.collection.mutable + +class AutoscalingPolicySuite extends FunSuite { + + /** + * These tests donot have the time at which job starts and ends automated. Automating this + * would make these tests better. + */ + test("basic test") { + /* List with `(jobId, startTime, endTime, optimumExecutors)` */ + val proxyJobList = List(List(1L, 1, 10, 2), List(2L, 11, 20, 4), List(3L, 21, 30, 1)) + + val (policy, client) = getPolicy(proxyJobList) + + // check that two executors have been added when job 1 starts + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + assert (client.updateRequests.size == 1) + assert (client.updateRequests.last == 2) + + // assert nothing changes when job 1 ends. + policy.scale(new SparkListenerJobEnd(1, 0, null)) + assert (client.updateRequests.size == 1) + assert (client.killRequests.size == 0) + + // assert total of 4 executors are requested when 2nd job starts + policy.scale(new SparkListenerJobStart(2, 0, Seq.empty)) + assert (client.updateRequests.size == 2) + assert (client.updateRequests.last == 4) + + // assert nothing changes when job 2 ends + policy.scale(new SparkListenerJobEnd(2, 0, null)) + assert (client.updateRequests.size == 2) + assert (client.killRequests.size == 0) + + // add all the requested executors. Should be done before removal of executors is tried. + policy.onExecutorAdded(new SparkListenerExecutorAdded(0, "1", null)) + policy.onExecutorAdded(new SparkListenerExecutorAdded(0, "2", null)) + policy.onExecutorAdded(new SparkListenerExecutorAdded(0, "3", null)) + policy.onExecutorAdded(new SparkListenerExecutorAdded(0, "4", null)) + + + // assert 2 executors have been asked to released when job 3 starts + policy.scale(new SparkListenerJobStart(3, 0, Seq.empty)) + assert (client.killRequests.size == 1) + assert (client.killRequests.last.size == 3) + assert (client.updateRequests.size == 3) + assert (client.updateRequests.last == 1) + + // assert that remaining 1 executor is also removed after job 3 (last job) is complete. + policy.scale(new SparkListenerJobEnd(3, 0, null)) + Thread.sleep(AutoscalingPolicy.releaseTimeout * 4) + assert (client.killRequests.size == 2) + assert (client.killRequests.last.size == 1) + + } + + test ("executors get removed if enough gap between jobs") { + /* List with `(jobId, startTime, endTime, optimumExecutors)` */ + val proxyJobList = List(List(1L, 1, 10, 10), + List(2L, AutoscalingPolicy.releaseTimeout + 50, AutoscalingPolicy.releaseTimeout + 60, 4)) + val (policy, client) = getPolicy(proxyJobList) + + // Start and end the first job + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + (1 to 10).foreach(x => policy.onExecutorAdded(new SparkListenerExecutorAdded(0, x.toString, null))) + policy.scale(new SparkListenerJobEnd(1, 0, null)) + + // check all executors have been asked to release + Thread.sleep(AutoscalingPolicy.releaseTimeout * 4) + assert (client.killRequests.size == 1) + assert (client.killRequests.last.size == 10) + + // when new job starts, it should ask for 4 more executors + policy.scale(new SparkListenerJobStart(2, 0, Seq.empty)) + assert (client.updateRequests.last == 4) + assert (client.killRequests.size == 1) + } + + test ("No more requests to allocator client if previous requests have already been made") { + /* List with `(jobId, startTime, endTime, optimumExecutors)` */ + val proxyJobList = List(List(1L, 1, 10, 4), List(2L, 11, 21, 4)) + val (policy, client) = getPolicy(proxyJobList) + + // Start and end job 1 , but no executors have yet been provided. + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + policy.scale(new SparkListenerJobEnd(1, 0, null)) + assert (client.updateRequests.size == 1) + assert (client.updateRequests.last == 4) + + // start job 2, but no new request to client since 4 executors have already been asked + policy.scale(new SparkListenerJobStart(2, 0 , Seq.empty)) + assert (client.updateRequests.size == 1) + } + + + test ("Asking for killing of executors should be irrespective of how many executors have " + + "actually been killed by the client") { + /* List with `(jobId, startTime, endTime, optimumExecutors)` */ + val proxyJobList = List(List(1L, 1, 10, 20), List(2L, 11, 20, 10), List(3L, 21, 30, 5), List + (4L, 31, 40, 2)) + val (policy, client) = getPolicy(proxyJobList) + + // start and end job 1 + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + policy.scale(new SparkListenerJobEnd(1, 0, null)) + (1 to 20).foreach(x => policy.onExecutorAdded(new SparkListenerExecutorAdded(0, x.toString, null))) + + // start job 2, it will ask to kill 10 executors, but donot kill them yet. + policy.scale(new SparkListenerJobStart(2, 0, Seq.empty)) + assert (client.killRequests.size == 1) + assert (client.killRequests.last.size == 10) + + // finish job 2. start job 3. More kill requests come, but even earlier executors have not + // been killed + policy.scale(new SparkListenerJobEnd(2, 0, null)) + policy.scale(new SparkListenerJobStart(3, 0, Seq.empty)) + assert (client.killRequests.size == 2) + assert (client.killRequests.last.size == 5) + assert (client.killRequests.head.toSet.intersect(client.killRequests.last.toSet).size == 0) + + // finish job 3. Some executors from both previous remove request are completed, but not all. + policy.scale(new SparkListenerJobEnd(3, 0, null)) + List(client.killRequests.head, client.killRequests.last).foreach(_.take(2).foreach(x => { + policy.onExecutorRemoved(new SparkListenerExecutorRemoved(0, x.toString, null)) + })) + + // despite these random executor removals, asking client for executor removal should be + // consistent + policy.scale(new SparkListenerJobStart(4, 0, Seq.empty)) + assert (client.killRequests.size == 3) + assert (client.killRequests.last.size == 3) // executors decreased from 5 to 2. + assert (client.killRequests.last.toSet.intersect( + client.killRequests.slice(0,1).flatMap(x => x).toSet).size == 0) + + + // remove remaining 2 executors after complete of 4th job + policy.scale(new SparkListenerJobEnd(4, 0, null)) + Thread.sleep(AutoscalingPolicy.releaseTimeout * 4) + assert (client.killRequests.size == 4) + assert (client.killRequests.last.size == 2) + + assert (client.killRequests.last.toSet.intersect( + client.killRequests.slice(0,2).flatMap(x => x).toSet).size == 0) + } + + test ("donot upscale beyond max") { + /* List with `(jobId, startTime, endTime, optimumExecutors)` */ + val proxyJobList = List(List(1L, 1, 10, 2000)) + val (policy, client) = getPolicy(proxyJobList) + + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + + assert (client.updateRequests.size == 1) + assert (client.updateRequests.last == 1000) + } + + test ("parallel jobs") { + val proxyJobList = List(List(1L, 1, 20, 20), List(2L, 11, 30, 10), List(3L, 15, 30, 5), List + (4L, 31 + AutoscalingPolicy.releaseTimeout, 40, 2)) + val (policy, client) = getPolicy(proxyJobList) + + // start job 1, check 20 executors + policy.scale(new SparkListenerJobStart(1, 0, Seq.empty)) + assert (client.updateRequests.size == 1) + assert (client.updateRequests.last == 20) + + // start job 2, check for 30 executors total now. + policy.scale(new SparkListenerJobStart(2, 0, Seq.empty)) + assert (client.updateRequests.size == 2) + assert (client.updateRequests.last == 30) + + // start job 3, check for 35 executors total + policy.scale(new SparkListenerJobStart(3, 0, Seq.empty)) + assert (client.updateRequests.size == 3) + assert (client.updateRequests.last == 35) + + // Add all the executors + (1 to 35).foreach(x => policy.onExecutorAdded(new SparkListenerExecutorAdded(0, x.toString, null))) + + // complete job 1, immediately remove corresponding executors + policy.scale(new SparkListenerJobEnd(1, 0, null)) + assert (client.killRequests.size == 1) + assert (client.killRequests.last.size == 20) + + // complete job 2, immediately remove corresponding executors + policy.scale(new SparkListenerJobEnd(2, 0, null)) + assert (client.killRequests.size == 2) + assert (client.killRequests.last.size == 10) + + // complete job 3, donot see that kill request has come for this + policy.scale(new SparkListenerJobEnd(3, 0, null)) + assert (client.killRequests.size == 2) + + // after waiting for release timeout, this should be released + Thread.sleep(AutoscalingPolicy.releaseTimeout * 4) + assert (client.killRequests.size == 3) + assert (client.killRequests.last.size == 5) + + } + + private def getPolicy(proxyJobList: List[List[Long]]): + (AutoscalingPolicy, DummyAutoscalingSparklensClient) = { + + AutoscalingPolicy.setTimeoutForUnitTest(100) + val appInfo = new ApplicationInfo() + appInfo.startTime = 0 + + val client = new DummyAutoscalingSparklensClient(new DummyExecutorAllocator()) + val executorId = "dummyExecutorId" + val execTimeSpan = new ExecutorTimeSpan(executorId, "dummyHostId", 4) + + val jobList = proxyJobList.sortWith(_.head < _.head).map(job => { + val jobTimeSpan = new DummyJobTimeSpan(job.head, job.last.asInstanceOf[Int]) + jobTimeSpan.startTime = job(1) + jobTimeSpan.endTime = job(2) + (job.head -> jobTimeSpan) + }) + val jobMap = collection.mutable.HashMap(jobList: _*).asInstanceOf[mutable.HashMap[Long, JobTimeSpan]] + + val executorMap = collection.mutable.HashMap(Seq(executorId -> execTimeSpan): _*) + .asInstanceOf[mutable.HashMap[String, ExecutorTimeSpan]] + val ac = new AppContext(appInfo, + new AggregateMetrics, + mutable.HashMap.empty[String, HostTimeSpan], + executorMap, + jobMap, + mutable.HashMap.empty[Int, StageTimeSpan], + mutable.HashMap.empty[Int, Long] + ) + (new AutoscalingPolicy(client, new SparkConf().set("spark.dynamicAllocation.maxExecutors", + "1000"), + Some(ac)), client) + } + + class DummyJobTimeSpan(jobID: Long, optimumNumExecs: Int) extends JobTimeSpan(jobID) { + override def optimumNumExecutorsForJob(coresPerExecutor: Int, maxConcurrent: Int): Int = { + optimumNumExecs + } + } + +} diff --git a/src/test/scala/org/apache/spark/DummyExecutorAllocator.scala b/src/test/scala/org/apache/spark/DummyExecutorAllocator.scala new file mode 100644 index 0000000..4647cf5 --- /dev/null +++ b/src/test/scala/org/apache/spark/DummyExecutorAllocator.scala @@ -0,0 +1,11 @@ +package org.apache.spark + +class DummyExecutorAllocator extends ExecutorAllocationClient { + override private[spark] def getExecutorIds() = ??? + + override private[spark] def requestTotalExecutors(numExecutors: Int, localityAwareTasks: Int, hostToLocalTaskCount: Map[String, Int]) = ??? + + override def requestExecutors(numAdditionalExecutors: Int): Boolean = ??? + + override def killExecutors(executorIds: Seq[String]): Boolean = ??? +} diff --git a/src/test/scala/org/apache/spark/autoscaling/DummyAutoscalingSparklensClient.scala b/src/test/scala/org/apache/spark/autoscaling/DummyAutoscalingSparklensClient.scala new file mode 100644 index 0000000..8e99a8b --- /dev/null +++ b/src/test/scala/org/apache/spark/autoscaling/DummyAutoscalingSparklensClient.scala @@ -0,0 +1,21 @@ +package org.apache.spark.autoscaling + +import org.apache.spark.ExecutorAllocationClient + +import scala.collection.mutable + +class DummyAutoscalingSparklensClient(client: ExecutorAllocationClient) + extends AutoscalingSparklensClient(client) { + + val updateRequests = mutable.ArrayBuffer.empty[Int] + val killRequests = mutable.ArrayBuffer.empty[Seq[String]] + + override def requestTotalExecutors(num: Int): Unit = { + updateRequests.append(num) + } + + override def killExecutors(execs: Seq[String]): Unit = { + killRequests.append(execs) + } + +}