Skip to content

Commit

Permalink
Merge pull request datastax#976 from datastax/SPARKC-377-master
Browse files Browse the repository at this point in the history
SPARKC-377 shutdown cluster and client connection sequentially
  • Loading branch information
jtgrabowski committed May 23, 2016
2 parents 26b8276 + 2443950 commit 0b1c5aa
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 13 deletions.
10 changes: 10 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,16 @@ scala:
- 2.10.5
- 2.11.7

cache:
directories:
- $HOME/.ivy2/cache
- $HOME/.sbt/boot/

before_cache:
# Tricks to avoid unnecessary cache updates
- find $HOME/.ivy2 -name "ivydata-*.properties" -delete
- find $HOME/.sbt -name "*.lock" -delete

script:
- "sbt ++$TRAVIS_SCALA_VERSION -Dtravis=true test"
- "sbt ++$TRAVIS_SCALA_VERSION -Dtravis=true it:test"
Expand Down
3 changes: 1 addition & 2 deletions project/Settings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ object Settings extends Build {
val osmxBean = ManagementFactory.getOperatingSystemMXBean.asInstanceOf[OperatingSystemMXBean]
val sysMemoryInMB = osmxBean.getTotalPhysicalMemorySize >> 20
val singleRunRequiredMem = 3 * 1024 + 512
val parallelTasks = if (isTravis) 2 else Math.max(1, ((sysMemoryInMB - 1550) / singleRunRequiredMem).toInt)
val parallelTasks = if (isTravis) 1 else Math.max(1, ((sysMemoryInMB - 1550) / singleRunRequiredMem).toInt)

// Due to lack of entrophy on virtual machines we want to use /dev/urandom instead of /dev/random
val useURandom = Files.exists(Paths.get("/dev/urandom"))
Expand All @@ -67,7 +67,6 @@ object Settings extends Build {
val cassandraTestVersion = sys.props.get("test.cassandra.version").getOrElse(Versions.Cassandra)

lazy val TEST_JAVA_OPTS = Seq(
"-Xms256m",
"-Xmx512m",
s"-Dtest.cassandra.version=$cassandraTestVersion",
"-Dsun.io.serialization.extendedDebugInfo=true",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,20 @@ object EmbeddedCassandra {
}
}

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
private val shutdownThread: Thread = new Thread("Shutdown embedded C* hook thread") {
override def run() = {
cassandraRunners.flatten.foreach(_.destroy())
release()
shutdown()
}
}))
}

Runtime.getRuntime.addShutdownHook(shutdownThread)

private[connector] def shutdown(): Unit = {
cassandraRunners.flatten.foreach(_.destroy())
release()
}

private[connector] def removeShutdownHook(): Boolean = {
Runtime.getRuntime.removeShutdownHook(shutdownThread)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import org.apache.commons.lang3.StringUtils
import org.scalatest._

import com.datastax.driver.core.Session
import com.datastax.spark.connector.embedded.SparkTemplate
import com.datastax.spark.connector.embedded.{EmbeddedCassandra, SparkTemplate}
import com.datastax.spark.connector.testkit.{AbstractSpec, SharedEmbeddedCassandra}
import com.datastax.spark.connector.util.SerialShutdownHooks


trait SparkCassandraITFlatSpecBase extends FlatSpec with SparkCassandraITSpecBase
Expand Down Expand Up @@ -57,4 +58,11 @@ trait SparkCassandraITSpecBase extends Suite with Matchers with SharedEmbeddedCa
object SparkCassandraITSpecBase {
val executor = Executors.newFixedThreadPool(100)
val ec = ExecutionContext.fromExecutor(executor)

EmbeddedCassandra.removeShutdownHook
// now embedded C* won't shutdown itself, let's do it in serial fashion
SerialShutdownHooks.add("Shutting down all Cassandra runners")(() => {
EmbeddedCassandra.shutdown
})

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import org.apache.spark.{Logging, SparkConf}

import com.datastax.driver.core.{Cluster, Host, Session}
import com.datastax.spark.connector.cql.CassandraConnectorConf.CassandraSSLConf

import com.datastax.spark.connector.util.SerialShutdownHooks

/** Provides and manages connections to Cassandra.
*
Expand Down Expand Up @@ -179,11 +179,9 @@ object CassandraConnector extends Logging {
hosts.map(h => conf.copy(hosts = Set(h.getAddress))) + conf.copy(hosts = hosts.map(_.getAddress))
}

Runtime.getRuntime.addShutdownHook(new Thread(new Runnable {
def run() {
sessionCache.shutdown()
}
}))
SerialShutdownHooks.add("Clearing session cache for C* connector")(() => {
sessionCache.shutdown()
})

/** Returns a CassandraConnector created from properties found in the [[org.apache.spark.SparkConf SparkConf]] object */
def apply(conf: SparkConf): CassandraConnector = {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.datastax.spark.connector.util

import scala.collection.mutable

import org.apache.spark.Logging

private[connector] object SerialShutdownHooks extends Logging {

private val hooks = mutable.Map[String, () => Unit]()
@volatile private var isShuttingDown = false

def add(name: String)(body: () => Unit): Unit = SerialShutdownHooks.synchronized {
if (isShuttingDown) {
logError(s"Adding shutdown hook ($name) during shutting down is not allowed.")
} else {
hooks.put(name, body)
}
}

Runtime.getRuntime.addShutdownHook(new Thread("Serial shutdown hooks thread") {
override def run(): Unit = {
SerialShutdownHooks.synchronized {
isShuttingDown = true
}
for ((name, task) <- hooks) {
try {
logDebug(s"Running shutdown hook: $name")
task()
logInfo(s"Successfully executed shutdown hook: $name")
} catch {
case exc: Throwable =>
logError(s"Shutdown hook ($name) failed", exc)
}
}
}
})
}

0 comments on commit 0b1c5aa

Please sign in to comment.