Skip to content

Commit

Permalink
SPARKC-230 add minimal threshold for number of partitions
Browse files Browse the repository at this point in the history
This changes CassandraPartitionGenerator so that it takes only
splitCount parameter (instead of splitCount and splitSize parameters).
This change is propagated down to Clusterer and Splitters.

Also this adds minimal threshold for number of partitions generated by
CassandraPartitionGenerator. Minimal threshold is evaluated as:
1 + context.defaultParallelism * 2
  • Loading branch information
jtgrabowski committed Aug 1, 2016
1 parent 15b14bc commit 47dd1c7
Show file tree
Hide file tree
Showing 26 changed files with 491 additions and 418 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ See [Building And Artifacts](doc/12_building_and_artifacts.md)
- [The Spark Shell](doc/13_spark_shell.md)
- [DataFrames](doc/14_data_frames.md)
- [Python](doc/15_python.md)
- [Partitioner](doc/16_partitioning.md)
- [Frequently Asked Questions](doc/FAQ.md)
- [Configuration Parameter Reference Table](doc/reference.md)

Expand Down
6 changes: 6 additions & 0 deletions doc/FAQ.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ the rpc_address is set to.
When troubleshooting Cassandra connections it is sometimes useful to set the rpc_address in the
C* yaml file to `0.0.0.0` so any incoming connection will work.

### How does the connector evaluate number of Spark partitions?

The Connector evaluates the number of Spark partitions by dividing table size estimate by
`input.split.size_in_mb` value. The resulting number of partitions in never smaller than
`1 + 2 * SparkContext.defaultParallelism`.

### What does input.split.size_in_mb use to determine size?

Input.split.size_in_mb uses a internal system table in C* ( >= 2.1.5) to determine the size
Expand Down
2 changes: 1 addition & 1 deletion doc/reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ OSS Cassandra this should never be used.</td>
<tr>
<td><code>input.split.size_in_mb</code></td>
<td>64</td>
<td>Approx amount of data to be fetched into a Spark partition</td>
<td>Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark partitions is <code>1 + 2 * SparkContext.defaultParallelism</code></td>
</tr>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,16 @@ package com.datastax.spark.connector.rdd

import java.io.IOException

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.Future

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.SparkTemplate._
import com.datastax.spark.connector.embedded._
import com.datastax.spark.connector.japi.CassandraJavaUtil._
import com.datastax.spark.connector.japi.CassandraRow
import com.datastax.spark.connector.types.TypeConverter
import org.apache.commons.lang3.tuple
import org.apache.spark.api.java.function.{Function => JFunction}

import scala.collection.JavaConversions._

class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase {
Expand Down Expand Up @@ -80,6 +77,13 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase {
session.execute(s"INSERT INTO $ks.wide_rows(key, group, value) VALUES (20, 20, '2020')")
session.execute(s"INSERT INTO $ks.wide_rows(key, group, value) VALUES (20, 21, '2021')")
session.execute(s"INSERT INTO $ks.wide_rows(key, group, value) VALUES (20, 22, '2022')")
},

Future {
session.execute(s"CREATE TABLE $ks.limit_test_table (key INT, value TEXT, PRIMARY KEY (key))")
for(i <- 0 to 30) {
session.execute(s"INSERT INTO $ks.limit_test_table (key, value) VALUES ($i, '$i')")
}
}
)
}
Expand Down Expand Up @@ -413,9 +417,10 @@ class CassandraJavaRDDSpec extends SparkCassandraITFlatSpecBase {
}

it should "allow to set limit" in {
val rdd = javaFunctions(sc).cassandraTable(ks, "test_table").limit(1L)
val limit = 1
val rdd = javaFunctions(sc).cassandraTable(ks, "limit_test_table").limit(limit.toLong)
val result = rdd.collect()
result should have size 1
result.size shouldBe <= (rdd.getNumPartitions * limit)
}

it should "allow to set ascending ordering" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,11 +357,6 @@ class CassandraRDDSpec extends SparkCassandraITFlatSpecBase {
result.head.getString("value") should startWith("000")
}

it should "use a single partition per node for a tiny table" in {
val rdd = sc.cassandraTable(ks, "key_value")
rdd.partitions should have length conn.hosts.size
}

it should "support single partition where clauses" in {
val someCass = sc
.cassandraTable[KeyValue](ks, "key_value")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.datastax.spark.connector.rdd

import org.apache.cassandra.tools.NodeProbe
import org.scalatest.Inspectors

import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.cql.CassandraConnector
import com.datastax.spark.connector.embedded.{CassandraRunner, EmbeddedCassandra}
import com.datastax.spark.connector.rdd.partitioner.DataSizeEstimates
import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory

class CassandraTableScanRDDSpec extends SparkCassandraITFlatSpecBase with Inspectors {

useCassandraConfig(Seq("cassandra-default.yaml.template"))
useSparkConf(defaultConf)

val conn = CassandraConnector(defaultConf)
val tokenFactory = TokenFactory.forSystemLocalPartitioner(conn)
val tableName = "data"
val noMinimalThreshold = Int.MinValue

"CassandraTableScanRDD" should "favor user provided split count over minimal threshold" in {
val userProvidedSplitCount = 8
val minimalSplitCountThreshold = 32
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 1, splitCount = Some(userProvidedSplitCount),
minimalSplitCountThreshold = minimalSplitCountThreshold)

val partitions = rddWith64MB.getPartitions

partitions.length should be(userProvidedSplitCount +- 1)
}

it should "favor user provided split count over size-estimated partitions" in {
val userProvidedSplitCount = 8
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 1, splitCount = Some(userProvidedSplitCount),
minimalSplitCountThreshold = noMinimalThreshold)

val partitions = rddWith64MB.getPartitions

partitions.length should be(userProvidedSplitCount +- 1)
}

it should "create size-estimated partitions with splitSize size" in {
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 1, minimalSplitCountThreshold = noMinimalThreshold)

val partitions = rddWith64MB.getPartitions

// theoretically there should be 64 splits, but it is ok to be "a little" inaccurate
partitions.length should (be >= 16 and be <= 256)
}

it should "create size-estimated partitions when above minimal threshold" in {
val minimalSplitCountThreshold = 2
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 1, minimalSplitCountThreshold = minimalSplitCountThreshold)

val partitions = rddWith64MB.getPartitions

// theoretically there should be 64 splits, but it is ok to be "a little" inaccurate
partitions.length should (be >= 16 and be <= 256)
}

it should "create size-estimated partitions but not less than minimum partitions threshold" in {
val minimalSplitCountThreshold = 64
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 32, minimalSplitCountThreshold = minimalSplitCountThreshold)

val partitions = rddWith64MB.getPartitions

partitions.length should be >= minimalSplitCountThreshold
}

it should "align index fields of partitions with their place in the array" in {
val minimalSplitCountThreshold = 64
val rddWith64MB = getCassandraTableScanRDD(splitSizeMB = 32, minimalSplitCountThreshold = minimalSplitCountThreshold)

val partitions = rddWith64MB.getPartitions

forAll(partitions.zipWithIndex) { case (part, index) => part.index should be(index) }
}

override def beforeAll(): Unit = {
conn.withSessionDo { session =>

session.execute(s"CREATE KEYSPACE IF NOT EXISTS $ks " +
s"WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': 1 }")

session.execute(s"CREATE TABLE $ks.$tableName(key int primary key, value text)")
val st = session.prepare(s"INSERT INTO $ks.$tableName(key, value) VALUES(?, ?)")
// 1M rows x 64 bytes of payload = 64 MB of data + overhead
for (i <- (1 to 1000000).par) {
val key = i.asInstanceOf[AnyRef]
val value = "123456789.123456789.123456789.123456789.123456789.123456789."
session.execute(st.bind(key, value))
}
}
for (host <- conn.hosts) {
val nodeProbe = new NodeProbe(host.getHostAddress,
EmbeddedCassandra.cassandraRunners(0).map(_.jmxPort).getOrElse(CassandraRunner.DefaultJmxPort))
nodeProbe.forceKeyspaceFlush(ks, tableName)
}

val timeout = CassandraRunner.SizeEstimatesUpdateIntervalInSeconds * 1000 * 5
assert(DataSizeEstimates.waitForDataSizeEstimates(conn, ks, tableName, timeout),
s"Data size estimates not present after $timeout ms. Test cannot be finished.")
}

private def getCassandraTableScanRDD(
splitSizeMB: Int,
splitCount: Option[Int] = None,
minimalSplitCountThreshold: Int): CassandraTableScanRDD[AnyRef] = {
val readConf = new ReadConf(splitSizeInMB = splitSizeMB, splitCount = splitCount)

new CassandraTableScanRDD[AnyRef](sc, conn, ks, tableName, readConf = readConf) {
override def minimalSplitCount: Int = minimalSplitCountThreshold
}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
package com.datastax.spark.connector.rdd.partitioner

import org.apache.cassandra.tools.NodeProbe
import org.scalatest.{Inspectors, Matchers, FlatSpec}
import com.datastax.spark.connector.rdd.partitioner.dht.TokenFactory

import com.datastax.spark.connector.SparkCassandraITFlatSpecBase
import com.datastax.spark.connector.cql.{Schema, CassandraConnector}
import com.datastax.spark.connector.embedded.{CassandraRunner, SparkTemplate, EmbeddedCassandra}
import com.datastax.spark.connector.rdd.CqlWhereClause
import com.datastax.spark.connector.testkit.SharedEmbeddedCassandra

class CassandraPartitionGeneratorSpec
extends SparkCassandraITFlatSpecBase with Inspectors {
extends SparkCassandraITFlatSpecBase {

useCassandraConfig(Seq("cassandra-default.yaml.template"))
val conn = CassandraConnector(defaultConf)
implicit val tokenFactory = TokenFactory.forSystemLocalPartitioner(conn)

conn.withSessionDo { session =>
createKeyspace(session)
Expand All @@ -25,7 +22,7 @@ class CassandraPartitionGeneratorSpec
// Should be improved in the future.
private def testPartitionCount(numPartitions: Int, min: Int, max: Int): Unit = {
val table = Schema.fromCassandra(conn, Some(ks), Some("empty")).tables.head
val partitioner = CassandraPartitionGenerator(conn, table, Some(numPartitions), 10000)
val partitioner = CassandraPartitionGenerator(conn, table, numPartitions)
val partitions = partitioner.partitions
partitions.length should be >= min
partitions.length should be <= max
Expand All @@ -39,44 +36,4 @@ class CassandraPartitionGeneratorSpec
it should "create about 10000 partitions when splitCount == 10000" in {
testPartitionCount(10000, 9000, 11000)
}

it should "create multiple partitions if the amount of data is big enough" in {
val tableName = "data"
conn.withSessionDo { session =>
session.execute(s"CREATE TABLE $ks.$tableName(key int primary key, value text)")
val st = session.prepare(s"INSERT INTO $ks.$tableName(key, value) VALUES(?, ?)")
// 1M rows x 64 bytes of payload = 64 MB of data + overhead
for (i <- (1 to 1000000).par) {
val key = i.asInstanceOf[AnyRef]
val value = "123456789.123456789.123456789.123456789.123456789.123456789."
session.execute(st.bind(key, value))
}
}

for (host <- conn.hosts) {
val nodeProbe = new NodeProbe(host.getHostAddress,
EmbeddedCassandra.cassandraRunners(0).map(_.jmxPort).getOrElse(CassandraRunner.DefaultJmxPort))
nodeProbe.forceKeyspaceFlush(ks, tableName)
}

val timeout = CassandraRunner.SizeEstimatesUpdateIntervalInSeconds * 1000 * 5
assert(DataSizeEstimates.waitForDataSizeEstimates(conn, ks, tableName, timeout),
s"Data size estimates not present after $timeout ms. Test cannot be finished.")

val table = Schema.fromCassandra(conn, Some(ks), Some(tableName)).tables.head
val partitioner = CassandraPartitionGenerator(conn, table, splitCount = None, splitSize = 1000000)
val partitions = partitioner.partitions

// theoretically there should be 64 splits, but it is ok to be "a little" inaccurate
partitions.length should be >= 16
partitions.length should be <= 256
}

it should "align index fields of partitions with their place in the array" in {
val table = Schema.fromCassandra(conn, Some(ks), Some("data")).tables.head
val partitioner = CassandraPartitionGenerator(conn, table, splitCount = Some(1000), splitSize = 100)
val partToIndex = partitioner.partitions.zipWithIndex
forAll (partToIndex) { case (part, index) => part.index should be (index) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import java.io.IOException

import com.datastax.spark.connector._
import com.datastax.spark.connector.cql._
import com.datastax.spark.connector.rdd.partitioner._
import com.datastax.spark.connector.rdd.partitioner.{CassandraPartition, CassandraPartitionGenerator, CqlTokenRange, DataSizeEstimates, NodeAddresses, _}
import com.datastax.spark.connector.rdd.partitioner.dht.{Token => ConnectorToken}
import com.datastax.spark.connector.rdd.reader._
import com.datastax.spark.connector.types.ColumnType
Expand Down Expand Up @@ -72,7 +72,8 @@ class CassandraTableScanRDD[R] private[connector](
val classTag: ClassTag[R],
@transient val rowReaderFactory: RowReaderFactory[R])
extends CassandraRDD[R](sc, Seq.empty)
with CassandraTableRowReaderProvider[R] {
with CassandraTableRowReaderProvider[R]
with SplitSizeEstimator[R] {

override type Self = CassandraTableScanRDD[R]

Expand Down Expand Up @@ -218,9 +219,10 @@ class CassandraTableScanRDD[R] private[connector](

@transient lazy val partitionGenerator = {
if (containsPartitionKey(where)) {
CassandraPartitionGenerator(connector, tableDef, Some(1), splitSize)
CassandraPartitionGenerator(connector, tableDef, 1)
} else {
CassandraPartitionGenerator(connector, tableDef, splitCount, splitSize)
val reevaluatedSplitCount = splitCount.getOrElse(estimateSplitCount(splitSize))
CassandraPartitionGenerator(connector, tableDef, reevaluatedSplitCount)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,10 @@ object ReadConf {
name = "spark.cassandra.input.split.size_in_mb",
section = ReferenceSection,
default = 64,
description = """Approx amount of data to be fetched into a Spark partition""")
description =
"""Approx amount of data to be fetched into a Spark partition. Minimum number of resulting Spark
| partitions is <code>1 + 2 * SparkContext.defaultParallelism</code>
|""".stripMargin.filter(_ >= ' '))

val FetchSizeInRowsParam = ConfigParameter[Int](
name = "spark.cassandra.input.fetch.size_in_rows",
Expand Down
Loading

0 comments on commit 47dd1c7

Please sign in to comment.