From 86f0de138f0ce3a11aedd2c400898398a38bcce0 Mon Sep 17 00:00:00 2001 From: artemaliev Date: Thu, 19 May 2016 18:38:08 +0300 Subject: [PATCH 1/2] SPARKC-383: cache all common CassandraRow data in a single CassandraRowMetaData object One CassandraRowMetaData per resultSet/Partition. That reduce object creation time and memory footprint. At this momement it is: 1. Java driver codec 2. Map comlumn names -> index for get by name methods 3. CqlColumnNames -> index --- .../writer/RoutingKeyGeneratorSpec.scala | 12 +- .../japi/GenericJavaRowReaderFactory.java | 7 +- .../spark/connector/CassandraRow.scala | 168 +++++++++++++----- .../spark/connector/GettableData.scala | 58 +++--- .../spark/connector/ScalaGettableData.scala | 4 +- .../datastax/spark/connector/UDTValue.scala | 2 + .../datastax/spark/connector/cql/Schema.scala | 2 +- .../spark/connector/japi/CassandraRow.scala | 48 +++-- .../connector/japi/JavaGettableData.scala | 4 +- .../spark/connector/japi/UDTValue.scala | 9 +- .../connector/rdd/CassandraJoinRDD.scala | 5 +- .../connector/rdd/CassandraTableScanRDD.scala | 6 +- .../rdd/reader/ClassBasedRowReader.scala | 14 +- .../rdd/reader/FunctionBasedRowReader.scala | 26 +-- .../GettableDataToMappedTypeConverter.scala | 17 +- .../rdd/reader/KeyValueRowReader.scala | 6 +- .../connector/rdd/reader/RowReader.scala | 7 +- .../rdd/reader/RowReaderFactory.scala | 8 +- .../connector/rdd/reader/ValueRowReader.scala | 4 +- .../spark/sql/cassandra/CassandraSQLRow.scala | 23 ++- 20 files changed, 276 insertions(+), 154 deletions(-) diff --git a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/RoutingKeyGeneratorSpec.scala b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/RoutingKeyGeneratorSpec.scala index 5fb7c5f44..e17fe1e9b 100644 --- a/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/RoutingKeyGeneratorSpec.scala +++ b/spark-cassandra-connector/src/it/scala/com/datastax/spark/connector/writer/RoutingKeyGeneratorSpec.scala @@ -1,12 +1,10 @@ package com.datastax.spark.connector.writer -import scala.concurrent.Future - +import com.datastax.spark.connector.cql.{CassandraConnector, Schema} +import com.datastax.spark.connector.{CassandraRow, CassandraRowMetadata, SparkCassandraITFlatSpecBase} import org.apache.cassandra.dht.IPartitioner -import com.datastax.spark.connector.cql.{CassandraConnector, Schema} -import com.datastax.spark.connector.embedded.SparkTemplate._ -import com.datastax.spark.connector.{CassandraRow, SparkCassandraITFlatSpecBase} +import scala.concurrent.Future class RoutingKeyGeneratorSpec extends SparkCassandraITFlatSpecBase { @@ -40,7 +38,7 @@ class RoutingKeyGeneratorSpec extends SparkCassandraITFlatSpecBase { session.execute(bStmt) val row = session.execute(s"""SELECT TOKEN(id) FROM $ks.one_key WHERE id = 1""").one() - val readTokenStr = CassandraRow.fromJavaDriverRow(row, Array("token(id)")).getString(0) + val readTokenStr = CassandraRow.fromJavaDriverRow(row, CassandraRowMetadata.fromColumnNames(IndexedSeq("token(id)"))).getString(0) val rk = rkg.apply(bStmt) val rkToken = cp.getToken(rk) @@ -61,7 +59,7 @@ class RoutingKeyGeneratorSpec extends SparkCassandraITFlatSpecBase { session.execute(bStmt) val row = session.execute(s"""SELECT TOKEN(id, id2) FROM $ks.two_keys WHERE id = 1 AND id2 = 'one'""").one() - val readTokenStr = CassandraRow.fromJavaDriverRow(row, Array("token(id,id2)")).getString(0) + val readTokenStr = CassandraRow.fromJavaDriverRow(row, CassandraRowMetadata.fromColumnNames(IndexedSeq(("token(id,id2)")))).getString(0) val rk = rkg.apply(bStmt) val rkToken = cp.getToken(rk) diff --git a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java index 1fc401e68..2510cd8e7 100644 --- a/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java +++ b/spark-cassandra-connector/src/main/java/com/datastax/spark/connector/japi/GenericJavaRowReaderFactory.java @@ -1,6 +1,7 @@ package com.datastax.spark.connector.japi; import com.datastax.driver.core.Row; +import com.datastax.spark.connector.CassandraRowMetadata; import com.datastax.spark.connector.ColumnRef; import com.datastax.spark.connector.cql.TableDef; import com.datastax.spark.connector.rdd.reader.RowReader; @@ -31,10 +32,10 @@ private JavaRowReader() { } @Override - public CassandraRow read(Row row, String[] columnNames) { - assert row.getColumnDefinitions().size() == columnNames.length : + public CassandraRow read(Row row, CassandraRowMetadata metaData) { + assert row.getColumnDefinitions().size() == metaData.columnNames().size() : "Number of columns in a row must match the number of columns in the table metadata"; - return CassandraRow$.MODULE$.fromJavaDriverRow(row, columnNames); + return CassandraRow$.MODULE$.fromJavaDriverRow(row, metaData); } @Override diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala index 9b1015afd..0b84f3bea 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala @@ -1,6 +1,6 @@ package com.datastax.spark.connector -import com.datastax.driver.core.Row +import com.datastax.driver.core.{CodecRegistry, ResultSet, Row, TypeCodec} /** Represents a single row fetched from Cassandra. * Offers getters to read individual fields by column name or column index. @@ -17,25 +17,25 @@ import com.datastax.driver.core.Row * * Recommended getters for Cassandra types: * - * - `ascii`: `getString`, `getStringOption` - * - `bigint`: `getLong`, `getLongOption` - * - `blob`: `getBytes`, `getBytesOption` - * - `boolean`: `getBool`, `getBoolOption` - * - `counter`: `getLong`, `getLongOption` - * - `decimal`: `getDecimal`, `getDecimalOption` - * - `double`: `getDouble`, `getDoubleOption` - * - `float`: `getFloat`, `getFloatOption` - * - `inet`: `getInet`, `getInetOption` - * - `int`: `getInt`, `getIntOption` - * - `text`: `getString`, `getStringOption` - * - `timestamp`: `getDate`, `getDateOption` - * - `timeuuid`: `getUUID`, `getUUIDOption` - * - `uuid`: `getUUID`, `getUUIDOption` - * - `varchar`: `getString`, `getStringOption` - * - `varint`: `getVarInt`, `getVarIntOption` - * - `list`: `getList[T]` - * - `set`: `getSet[T]` - * - `map`: `getMap[K, V]` + * - `ascii`: `getString`, `getStringOption` + * - `bigint`: `getLong`, `getLongOption` + * - `blob`: `getBytes`, `getBytesOption` + * - `boolean`: `getBool`, `getBoolOption` + * - `counter`: `getLong`, `getLongOption` + * - `decimal`: `getDecimal`, `getDecimalOption` + * - `double`: `getDouble`, `getDoubleOption` + * - `float`: `getFloat`, `getFloatOption` + * - `inet`: `getInet`, `getInetOption` + * - `int`: `getInt`, `getIntOption` + * - `text`: `getString`, `getStringOption` + * - `timestamp`: `getDate`, `getDateOption` + * - `timeuuid`: `getUUID`, `getUUIDOption` + * - `uuid`: `getUUID`, `getUUIDOption` + * - `varchar`: `getString`, `getStringOption` + * - `varint`: `getVarInt`, `getVarIntOption` + * - `list`: `getList[T]` + * - `set`: `getSet[T]` + * - `map`: `getMap[K, V]` * * Collection getters `getList`, `getSet` and `getMap` require to explicitly pass an appropriate item type: * {{{ @@ -46,17 +46,17 @@ import com.datastax.driver.core.Row * * Generic `get` allows to automatically convert collections to other collection types. * Supported containers: - * - `scala.collection.immutable.List` - * - `scala.collection.immutable.Set` - * - `scala.collection.immutable.TreeSet` - * - `scala.collection.immutable.Vector` - * - `scala.collection.immutable.Map` - * - `scala.collection.immutable.TreeMap` - * - `scala.collection.Iterable` - * - `scala.collection.IndexedSeq` - * - `java.util.ArrayList` - * - `java.util.HashSet` - * - `java.util.HashMap` + * - `scala.collection.immutable.List` + * - `scala.collection.immutable.Set` + * - `scala.collection.immutable.TreeSet` + * - `scala.collection.immutable.Vector` + * - `scala.collection.immutable.Map` + * - `scala.collection.immutable.TreeMap` + * - `scala.collection.Iterable` + * - `scala.collection.IndexedSeq` + * - `java.util.ArrayList` + * - `java.util.HashSet` + * - `java.util.HashMap` * * Example: * {{{ @@ -68,16 +68,83 @@ import com.datastax.driver.core.Row * * * Timestamps can be converted to other Date types by using generic `get`. Supported date types: - * - java.util.Date - * - java.sql.Date - * - org.joda.time.DateTime + * - java.util.Date + * - java.sql.Date + * - org.joda.time.DateTime */ -final class CassandraRow(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef]) +final class CassandraRow(val metaData: CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends ScalaGettableData with Serializable { + /** + * The constructor is for testing and backward compatibility only. + * Use default constructor with shared metadata for memory saving and performance. + * + * @param columnNames + * @param columnValues + */ + @deprecated("Use default constructor", "1.6.0") + def this(columnNames: IndexedSeq[String], columnValues: IndexedSeq[AnyRef]) = + this(CassandraRowMetadata.fromColumnNames(columnNames), columnValues) + override def toString = "CassandraRow" + dataAsString } +/** + * All CassandraRows shared data + * + * @param columnNames row column names + * @param resultSetColumnNames column names from java driver row result set, without connector aliases. + * @param codecs cached java driver codecs to avoid registry lookups + * + */ +case class CassandraRowMetadata(columnNames: IndexedSeq[String], + resultSetColumnNames: Option[IndexedSeq[String]] = None, + // transient because codecs are not serializable and used only at Row parsing + // not and option as deserialized fileld will be null not None + @transient private[connector] val codecs: IndexedSeq[TypeCodec[AnyRef]] = null) { + @transient + lazy val namesToIndex: Map[String, Int] = columnNames.zipWithIndex.toMap.withDefaultValue(-1) + @transient + lazy val indexOfCqlColumnOrThrow = unaliasedColumnNames.zipWithIndex.toMap.withDefault { name => + throw new ColumnNotFoundException( + s"Column not found: $name. " + + s"Available columns are: ${columnNames.mkString("[", ", ", "]")}") + } + + @transient + lazy val indexOfOrThrow = namesToIndex.withDefault { name => + throw new ColumnNotFoundException( + s"Column not found: $name. " + + s"Available columns are: ${columnNames.mkString("[", ", ", "]")}") + } + + def unaliasedColumnNames = resultSetColumnNames.getOrElse(columnNames) +} + +object CassandraRowMetadata { + + def fromResultSet(columnNames: IndexedSeq[String], rs: ResultSet) = { + import scala.collection.JavaConversions._ + val columnDefs = rs.getColumnDefinitions.asList().toList + val rsColumnNames = columnDefs.map(_.getName) + val codecs = columnDefs.map(col => CodecRegistry.DEFAULT_INSTANCE.codecFor(col.getType)) + .asInstanceOf[List[TypeCodec[AnyRef]]] + CassandraRowMetadata(columnNames, Some(rsColumnNames.toIndexedSeq), codecs.toIndexedSeq) + } + + /** + * create metadata object without codecs. Should be used for testing only + * + * @param columnNames + * @return + */ + def fromColumnNames(columnNames: IndexedSeq[String]): CassandraRowMetadata = + CassandraRowMetadata(columnNames, None) + + def fromColumnNames(columnNames: Seq[String]): CassandraRowMetadata = + fromColumnNames(columnNames.toIndexedSeq) +} + object CassandraRow { /** Deserializes first n columns from the given `Row` and returns them as @@ -86,18 +153,37 @@ object CassandraRow { * the newly created `CassandraRow`, but it is not used to fetch data from * the input `Row` in order to improve performance. Fetching column values by name is much * slower than fetching by index. */ - def fromJavaDriverRow(row: Row, columnNames: Array[String]): CassandraRow = { - val data = new Array[Object](columnNames.length) - for (i <- columnNames.indices) - data(i) = GettableData.get(row, i) - new CassandraRow(columnNames, data) + def fromJavaDriverRow(row: Row, metaData: CassandraRowMetadata): CassandraRow = { + val length = metaData.columnNames.length + var i = 0 + val data = new Array[Object](length) + + // Here we use a mutable while loop for performance reasons, scala for loops are + // converted into range.foreach() and the JVM is unable to inline the foreach closure. + // 'match' is replaced with 'if' for the same reason. + // It is also out of the loop for performance. + if (metaData.codecs == null) { + //that should not happen in production, but just in case + while (i < length) { + data(i) = GettableData.get(row, i) + i += 1 + } + } + else { + while (i < length) { + data(i) = GettableData.get(row, i, metaData.codecs(i)) + i += 1 + } + } + new CassandraRow(metaData, data) } + /** Creates a CassandraRow object from a map with keys denoting column names and * values denoting column values. */ def fromMap(map: Map[String, Any]): CassandraRow = { val (columnNames, values) = map.unzip - new CassandraRow(columnNames.toIndexedSeq, values.map(_.asInstanceOf[AnyRef]).toIndexedSeq) + new CassandraRow(CassandraRowMetadata.fromColumnNames(columnNames.toIndexedSeq), values.map(_.asInstanceOf[AnyRef]).toIndexedSeq) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala index caf265085..3f38e0db3 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/GettableData.scala @@ -4,53 +4,50 @@ import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import com.datastax.driver.core.{LocalDate, Row, TupleValue => DriverTupleValue, UDTValue => DriverUDTValue} +import com.datastax.driver.core.{LocalDate, Row, TypeCodec, TupleValue => DriverTupleValue, UDTValue => DriverUDTValue} import com.datastax.spark.connector.types.TypeConverter.StringConverter import com.datastax.spark.connector.util.ByteBufferUtil trait GettableData extends GettableByIndexData { - def columnNames: IndexedSeq[String] + def metaData: CassandraRowMetadata - @transient - private[connector] lazy val _indexOf = - columnNames.zipWithIndex.toMap.withDefaultValue(-1) - - @transient - private[connector] lazy val _indexOfOrThrow = _indexOf.withDefault { name => - throw new ColumnNotFoundException( - s"Column not found: $name. " + - s"Available columns are: ${columnNames.mkString("[", ", ", "]")}") - } - - /** Returns a column value by index without applying any conversion. + /** Returns a column value by aliased name without applying any conversion. * The underlying type is the same as the type returned by the low-level Cassandra driver, * is implementation defined and may change in the future. * Cassandra nulls are returned as Scala nulls. */ - def getRaw(name: String): AnyRef = columnValues(_indexOfOrThrow(name)) + def getRaw(name: String): AnyRef = columnValues(metaData.indexOfOrThrow(name)) + + /** + * Returns a column value by cql Name + * @param name + * @return + */ + def getRawCql(name: String): AnyRef = columnValues(metaData.indexOfCqlColumnOrThrow(name)) + /** Returns true if column value is Cassandra null */ def isNullAt(name: String): Boolean = { - columnValues(_indexOfOrThrow(name)) == null + columnValues(metaData.indexOfOrThrow(name)) == null } /** Returns index of column with given name or -1 if column not found */ def indexOf(name: String): Int = - _indexOf(name) + metaData.namesToIndex(name) /** Returns the name of the i-th column. */ def nameOf(index: Int): String = - columnNames(index) + metaData.columnNames(index) /** Returns true if column with given name is defined and has an * entry in the underlying value array, i.e. was requested in the result set. * For columns having null value, returns true. */ def contains(name: String): Boolean = - _indexOf(name) != -1 + metaData.namesToIndex(name) != -1 /** Displays the content in human readable form, including the names and values of the columns */ override def dataAsString = - columnNames + metaData.columnNames .zip(columnValues) .map(kv => kv._1 + ": " + StringConverter.convert(kv._2)) .mkString("{", ", ", "}") @@ -59,13 +56,13 @@ trait GettableData extends GettableByIndexData { override def equals(o: Any) = o match { case o: GettableData if - this.columnNames == o.columnNames && + this.metaData == o.metaData && this.columnValues == o.columnValues => true case _ => false } override def hashCode = - columnNames.hashCode * 31 + columnValues.hashCode + metaData.hashCode * 31 + columnValues.hashCode } object GettableData { @@ -96,12 +93,29 @@ object GettableData { null } + + /** Deserializes given field from the DataStax Java Driver `Row` into appropriate Java type by using predefined codec + * If the field is null, returns null (not Scala Option). */ + def get(row: Row, index: Int, codec: TypeCodec[AnyRef]): AnyRef = { + val data = row.get(index, codec) + if (data != null) + convert(data) + else + null + } + def get(row: Row, name: String): AnyRef = { val index = row.getColumnDefinitions.getIndexOf(name) require(index >= 0, s"Column not found in Java driver Row: $name") get(row, index) } + def get(row: Row, name: String, codec: TypeCodec[AnyRef]): AnyRef = { + val index = row.getColumnDefinitions.getIndexOf(name) + require(index >= 0, s"Column not found in Java driver Row: $name") + get(row, index, codec) + } + def get(value: DriverUDTValue, name: String): AnyRef = { val quotedName = "\"" + name + "\"" val data = value.getObject(quotedName) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/ScalaGettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/ScalaGettableData.scala index 079de42f6..6b9828b2d 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/ScalaGettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/ScalaGettableData.scala @@ -12,12 +12,12 @@ trait ScalaGettableData extends ScalaGettableByIndexData with GettableData { /** Converts this row to a Map */ def toMap: Map[String, Any] = - columnNames.zip(columnValues).toMap + metaData.columnNames.zip(columnValues).toMap /** Generic getter for getting columns of any type. * Looks the column up by column name. Column names are case-sensitive.*/ def get[T](name: String)(implicit c: TypeConverter[T]): T = - get[T](_indexOfOrThrow(name)) + get[T](metaData.indexOfOrThrow(name)) /** Returns a `bool` column value. Besides working with `bool` Cassandra type, it can also read * numbers and strings. Non-zero numbers are converted to `true`, zero is converted to `false`. diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/UDTValue.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/UDTValue.scala index 675b17b46..65c49c0c8 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/UDTValue.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/UDTValue.scala @@ -12,6 +12,8 @@ final case class UDTValue(columnNames: IndexedSeq[String], columnValues: Indexed extends ScalaGettableData { override def productArity: Int = columnValues.size override def productElement(i: Int) = columnValues(i) + + override def metaData = CassandraRowMetadata.fromColumnNames(columnNames) } object UDTValue { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala index 90d36b5d9..e2d463ebf 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/cql/Schema.scala @@ -200,7 +200,7 @@ case class TableDef( type ValueRepr = CassandraRow def newInstance(columnValues: Any*): CassandraRow = { - new CassandraRow(columnNames, columnValues.toIndexedSeq.map(_.asInstanceOf[AnyRef])) + new CassandraRow(CassandraRowMetadata.fromColumnNames(columnNames), columnValues.toIndexedSeq.map(_.asInstanceOf[AnyRef])) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala index 281fdf664..eba95e80b 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala @@ -1,17 +1,24 @@ package com.datastax.spark.connector.japi import com.datastax.driver.core.Row -import com.datastax.spark.connector.GettableData +import com.datastax.spark.connector.{CassandraRowMetadata, GettableData} -final class CassandraRow(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef]) +final class CassandraRow(val metaData:CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends JavaGettableData with Serializable { - private[spark] def this() = this(null: IndexedSeq[String], null) // required by Kryo for deserialization :( + private[spark] def this() = this(null: CassandraRowMetadata, null: IndexedSeq[AnyRef]) // required by Kryo for deserialization :( + def this(metaData: CassandraRowMetadata, columnValues: Array[AnyRef]) = + this(metaData, columnValues.toIndexedSeq) + + /** + * the consturctor is for testing and backward compatibility only. + * Use default constructor with shared metadata for memory saving and performance. + */ def this(columnNames: Array[String], columnValues: Array[AnyRef]) = - this(columnNames.toIndexedSeq, columnValues.toIndexedSeq) + this(CassandraRowMetadata.fromColumnNames(columnNames.toIndexedSeq), columnValues.toIndexedSeq) - protected def fieldNames = columnNames + protected def fieldNames = metaData protected def fieldValues = columnValues def iterator = columnValues.iterator @@ -27,18 +34,35 @@ object CassandraRow { * the newly created `CassandraRow`, but it is not used to fetch data from * the input `Row` in order to improve performance. Fetching column values by name is much * slower than fetching by index. */ - def fromJavaDriverRow(row: Row, columnNames: Array[String]): CassandraRow = { - val data = new Array[Object](columnNames.length) - for (i <- columnNames.indices) - data(i) = GettableData.get(row, i) - new CassandraRow(columnNames, data) - } + def fromJavaDriverRow(row: Row, metaData: CassandraRowMetadata): CassandraRow = { + val length = metaData.columnNames.length + var i = 0 + val data = new Array[Object](length) + // Here we use a mutable while loop for performance reasons, scala for loops are + // converted into range.foreach() and the JVM is unable to inline the foreach closure. + // 'match' is replaced with 'if' for the same reason. + // It is also out of the loop for performance. + if (metaData.codecs == null) { + //that should not happen in production, but just in case + while (i < length) { + data(i) = GettableData.get(row, i) + i += 1 + } + } + else { + while (i < length) { + data(i) = GettableData.get(row, i, metaData.codecs(i)) + i += 1 + } + } + new CassandraRow(metaData, data) + } /** Creates a CassandraRow object from a map with keys denoting column names and * values denoting column values. */ def fromMap(map: Map[String, Any]): CassandraRow = { val (columnNames, values) = map.unzip - new CassandraRow(columnNames.toArray, values.map(_.asInstanceOf[AnyRef]).toArray) + new CassandraRow(CassandraRowMetadata.fromColumnNames(columnNames.toIndexedSeq), values.map(_.asInstanceOf[AnyRef]).toArray) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/JavaGettableData.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/JavaGettableData.scala index cd4463dc4..b475595f2 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/JavaGettableData.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/JavaGettableData.scala @@ -17,7 +17,7 @@ trait JavaGettableData extends JavaGettableByIndexData with GettableData { /** Converts this row to a Map */ def toMap: JMap[String, AnyRef] = { val map = new JHashMap[String, AnyRef]() - for (i <- 0 until length) map.put(columnNames(i), columnValues(i)) + for (i <- 0 until length) map.put(metaData.columnNames(i), columnValues(i)) map } @@ -29,7 +29,7 @@ trait JavaGettableData extends JavaGettableByIndexData with GettableData { /** Generic getter for getting columns of any type. * Looks the column up by column name. Column names are case-sensitive.*/ private def _get[T <: AnyRef](name: String)(implicit tc: TypeConverter[T]): T = - tc.convert(columnValues(_indexOfOrThrow(name))) + tc.convert(columnValues(metaData.indexOfOrThrow(name))) /** Equivalent to `getAny` */ def apply(name: String): AnyRef = getObject(name) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/UDTValue.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/UDTValue.scala index fce350644..a14ba0333 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/UDTValue.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/UDTValue.scala @@ -1,11 +1,10 @@ package com.datastax.spark.connector.japi import scala.reflect.runtime.universe._ +import com.datastax.spark.connector.types.{NullableTypeConverter, TypeConverter} +import com.datastax.spark.connector.{CassandraRowMetadata, UDTValue => ConnectorUDTValue} -import com.datastax.spark.connector.types.{TypeConverter, NullableTypeConverter} -import com.datastax.spark.connector.{UDTValue => ConnectorUDTValue} - -final class UDTValue(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef]) +final class UDTValue(val metaData: CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends JavaGettableData with Serializable object UDTValue { @@ -18,7 +17,7 @@ object UDTValue { def convertPF = { case x: UDTValue => x case x: ConnectorUDTValue => - new UDTValue(x.columnNames, x.columnValues) + new UDTValue(x.metaData, x.columnValues) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala index 958ec9268..8826e80f7 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraJoinRDD.scala @@ -239,7 +239,7 @@ class CassandraJoinRDD[L, R] private[connector]( session: Session, bsb: BoundStatementBuilder[L], leftIterator: Iterator[L]): Iterator[(L, R)] = { - val columnNamesArray = selectedColumnRefs.map(_.selectedAs).toArray + val columnNames = selectedColumnRefs.map(_.selectedAs).toIndexedSeq val rateLimiter = new RateLimiter( readConf.throughputJoinQueryPerSec, readConf.throughputJoinQueryPerSec) @@ -251,7 +251,8 @@ class CassandraJoinRDD[L, R] private[connector]( Futures.addCallback(queryFuture, new FutureCallback[ResultSet] { def onSuccess(rs: ResultSet) { val resultSet = new PrefetchingResultSetIterator(rs, fetchSize) - val rightSide = resultSet.map(rowReader.read(_, columnNamesArray)) + val columnMetaData = CassandraRowMetadata.fromResultSet(columnNames,rs); + val rightSide = resultSet.map(rowReader.read(_, columnMetaData)) resultFuture.set(leftSide.zip(rightSide)) } def onFailure(throwable: Throwable) { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala index f76726eb9..0c3080f95 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/CassandraTableScanRDD.scala @@ -304,13 +304,15 @@ class CassandraTableScanRDD[R] private[connector]( s"with $cql " + s"with params ${values.mkString("[", ",", "]")}") val stmt = createStatement(session, cql, values: _*) - val columnNamesArray = selectedColumnRefs.map(_.selectedAs).toArray try { val rs = session.execute(stmt) + val columnNames = selectedColumnRefs.map(_.selectedAs).toIndexedSeq + val columnMetaData = CassandraRowMetadata.fromResultSet(columnNames,rs) + val iterator = new PrefetchingResultSetIterator(rs, fetchSize) val iteratorWithMetrics = iterator.map(inputMetricsUpdater.updateMetrics) - val result = iteratorWithMetrics.map(rowReader.read(_, columnNamesArray)) + val result = iteratorWithMetrics.map(rowReader.read(_, columnMetaData)) logDebug(s"Row iterator for range ${range.cql(partitionKeyStr)} obtained successfully.") result } catch { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ClassBasedRowReader.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ClassBasedRowReader.scala index 694814ab0..6b3f51ed0 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ClassBasedRowReader.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ClassBasedRowReader.scala @@ -1,15 +1,13 @@ package com.datastax.spark.connector.rdd.reader -import scala.collection.JavaConversions._ -import scala.reflect.runtime.universe._ - -import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock - import com.datastax.driver.core.Row import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.TableDef import com.datastax.spark.connector.mapper._ import com.datastax.spark.connector.util.JavaApiHelper +import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock + +import scala.reflect.runtime.universe._ /** Transforms a Cassandra Java driver `Row` into an object of a user provided class, @@ -31,10 +29,8 @@ final class ClassBasedRowReader[R : TypeTag : ColumnMapper]( Some(ctorRefs ++ setterRefs) } - override def read(row: Row, ignored: Array[String]): R = { - // can't use passed array of column names, because it is already after applying aliases - val columnNames = row.getColumnDefinitions.iterator.map(_.getName).toArray - val cassandraRow = CassandraRow.fromJavaDriverRow(row, columnNames) + override def read(row: Row, rowMetaData: CassandraRowMetadata): R = { + val cassandraRow = CassandraRow.fromJavaDriverRow(row, rowMetaData) converter.convert(cassandraRow) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/FunctionBasedRowReader.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/FunctionBasedRowReader.scala index 839d195df..27e10ffed 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/FunctionBasedRowReader.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/FunctionBasedRowReader.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.rdd.reader import com.datastax.driver.core.Row -import com.datastax.spark.connector.GettableData +import com.datastax.spark.connector.{CassandraRowMetadata, GettableData} import com.datastax.spark.connector.types.TypeConverter import scala.reflect.ClassTag @@ -19,7 +19,7 @@ trait FunctionBasedRowReader[R] extends RowReader[R] with ThisRowReaderAsFactory class FunctionBasedRowReader1[R, A0](f: A0 => R)( implicit a0c: TypeConverter[A0], @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f(a0c.convert(GettableData.get(row, 0))) } @@ -31,7 +31,7 @@ class FunctionBasedRowReader2[R, A0, A1](f: (A0, A1) => R)( @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)) @@ -46,7 +46,7 @@ class FunctionBasedRowReader3[R, A0, A1, A2](f: (A0, A1, A2) => R)( @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -63,7 +63,7 @@ class FunctionBasedRowReader4[R, A0, A1, A2, A3](f: (A0, A1, A2, A3) => R)( @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -82,7 +82,7 @@ class FunctionBasedRowReader5[R, A0, A1, A2, A3, A4](f: (A0, A1, A2, A3, A4) => @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -103,7 +103,7 @@ class FunctionBasedRowReader6[R, A0, A1, A2, A3, A4, A5](f: (A0, A1, A2, A3, A4, @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -126,7 +126,7 @@ class FunctionBasedRowReader7[R, A0, A1, A2, A3, A4, A5, A6](f: (A0, A1, A2, A3, @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -152,7 +152,7 @@ class FunctionBasedRowReader8[R, A0, A1, A2, A3, A4, A5, A6, A7] @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -180,7 +180,7 @@ class FunctionBasedRowReader9[R, A0, A1, A2, A3, A4, A5, A6, A7, A8] @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -210,7 +210,7 @@ class FunctionBasedRowReader10[R, A0, A1, A2, A3, A4, A5, A6, A7, A8, A9] @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -242,7 +242,7 @@ class FunctionBasedRowReader11[R, A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10] @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), @@ -276,7 +276,7 @@ class FunctionBasedRowReader12[R, A0, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A @transient override val ct: ClassTag[R]) extends FunctionBasedRowReader[R] { - override def read(row: Row, columnNames: Array[String]) = + override def read(row: Row, rowMetaData: CassandraRowMetadata) = f( a0c.convert(GettableData.get(row, 0)), a1c.convert(GettableData.get(row, 1)), diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/GettableDataToMappedTypeConverter.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/GettableDataToMappedTypeConverter.scala index 41360b680..a6f7fe957 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/GettableDataToMappedTypeConverter.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/GettableDataToMappedTypeConverter.scala @@ -2,16 +2,15 @@ package com.datastax.spark.connector.rdd.reader import java.lang.reflect.Method -import scala.language.existentials -import scala.reflect.runtime.universe._ - +import com.datastax.spark.connector._ +import com.datastax.spark.connector.cql.StructDef +import com.datastax.spark.connector.mapper.{ColumnMapper, DefaultColumnMapper, JavaBeanColumnMapper, TupleColumnMapper} +import com.datastax.spark.connector.types._ +import com.datastax.spark.connector.util.{ReflectionUtil, Symbols} import org.apache.spark.sql.catalyst.ReflectionLock.SparkReflectionLock -import com.datastax.spark.connector.cql.StructDef -import com.datastax.spark.connector.mapper.{JavaBeanColumnMapper, TupleColumnMapper, ColumnMapper, DefaultColumnMapper} -import com.datastax.spark.connector.types.{TupleType, MapType, SetType, ListType, TypeConversionException, BigIntType, ColumnType, TypeConverter, UserDefinedType} -import com.datastax.spark.connector.util.{Symbols, ReflectionUtil} -import com.datastax.spark.connector._ +import scala.language.existentials +import scala.reflect.runtime.universe._ /** Converts a `GettableData` object representing a table row or a UDT value * to a tuple or a case class object using the given `ColumnMapper`. @@ -159,7 +158,7 @@ private[connector] class GettableDataToMappedTypeConverter[T : TypeTag : ColumnM data: GettableData, converter: TypeConverter[_]): AnyRef = { val name = columnRef.columnName - val value = data.getRaw(columnRef.cqlValueName) + val value = data.getRawCql(columnRef.cqlValueName) checkNotNull(tryConvert(value, converter, name), name) } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/KeyValueRowReader.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/KeyValueRowReader.scala index 4bf4f0595..331b61807 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/KeyValueRowReader.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/KeyValueRowReader.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.rdd.reader import com.datastax.driver.core.Row -import com.datastax.spark.connector.{ColumnSelector, ColumnRef} +import com.datastax.spark.connector.{CassandraRowMetadata, ColumnRef, ColumnSelector} import com.datastax.spark.connector.cql.TableDef private[connector] class KeyValueRowReaderFactory[K, V]( @@ -26,7 +26,7 @@ private[connector] class KeyValueRowReader[K, V](keyReader: RowReader[K], valueR (for (keyNames <- keyReader.neededColumns; valueNames <- valueReader.neededColumns) yield keyNames ++ valueNames) .orElse(keyReader.neededColumns).orElse(valueReader.neededColumns) - override def read(row: Row, columnNames: Array[String]): (K, V) = { - (keyReader.read(row, columnNames), valueReader.read(row, columnNames)) + override def read(row: Row, rowMetaData: CassandraRowMetadata): (K, V) = { + (keyReader.read(row, rowMetaData), valueReader.read(row, rowMetaData)) } } diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReader.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReader.scala index 2fb59dac4..3fd291f65 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReader.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReader.scala @@ -1,16 +1,17 @@ package com.datastax.spark.connector.rdd.reader import com.datastax.driver.core.Row -import com.datastax.spark.connector.ColumnRef +import com.datastax.spark.connector.{CassandraRowMetadata, ColumnRef} /** Transforms a Cassandra Java driver `Row` into high-level row representation, e.g. a tuple * or a user-defined case class object. The target type `T` must be serializable. */ trait RowReader[T] extends Serializable { /** Reads column values from low-level `Row` and turns them into higher level representation. + * * @param row row fetched from Cassandra - * @param columnNames column names available in the `row` */ - def read(row: Row, columnNames: Array[String]): T + * @param rowMetaData column names and codec available in the `row` */ + def read(row: Row, rowMetaData: CassandraRowMetadata): T /** List of columns this `RowReader` is going to read. * Useful to avoid fetching the columns that are not needed. */ diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReaderFactory.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReaderFactory.scala index 599c12935..e4193e95e 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReaderFactory.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/RowReaderFactory.scala @@ -3,7 +3,7 @@ package com.datastax.spark.connector.rdd.reader import java.io.Serializable import com.datastax.driver.core.Row -import com.datastax.spark.connector.{ColumnRef, CassandraRow} +import com.datastax.spark.connector.{CassandraRow, CassandraRowMetadata, ColumnRef} import com.datastax.spark.connector.cql.TableDef import com.datastax.spark.connector.mapper.ColumnMapper import com.datastax.spark.connector.types.TypeConverter @@ -57,10 +57,10 @@ object RowReaderFactory extends LowPriorityRowReaderFactoryImplicits { override def targetClass: Class[CassandraRow] = classOf[CassandraRow] - override def read(row: Row, columnNames: Array[String]) = { - assert(row.getColumnDefinitions.size() == columnNames.size, + override def read(row: Row, rowMetaData: CassandraRowMetadata) = { + assert(row.getColumnDefinitions.size() == rowMetaData.columnNames.size, "Number of columns in a row must match the number of columns in the table metadata") - CassandraRow.fromJavaDriverRow(row, columnNames) + CassandraRow.fromJavaDriverRow(row, rowMetaData) } override def neededColumns: Option[Seq[ColumnRef]] = None diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ValueRowReader.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ValueRowReader.scala index 902ed54ac..af41072ab 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ValueRowReader.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/rdd/reader/ValueRowReader.scala @@ -12,8 +12,8 @@ class ValueRowReader[T: TypeConverter](columnRef: ColumnRef) extends RowReader[T /** Reads column values from low-level `Row` and turns them into higher level representation. * @param row row fetched from Cassandra - * @param columnNames column names available in the `row` */ - override def read(row: Row, columnNames: Array[String]): T = + * @param rowMetaData: column names available in the `row` */ + override def read(row: Row, rowMetaData: CassandraRowMetadata): T = converter.convert(GettableData.get(row, columnRef.cqlValueName)) /** List of columns this `RowReader` is going to read. diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index af63d378d..864a717e4 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -2,23 +2,22 @@ package org.apache.spark.sql.cassandra import java.net.InetAddress import java.sql.Timestamp -import java.util.{UUID, Date} +import java.util.{Date, UUID} import java.math.BigInteger import com.datastax.driver.core.Row import com.datastax.driver.core.LocalDate -import com.datastax.spark.connector.{TupleValue, UDTValue, GettableData} -import com.datastax.spark.connector.rdd.reader.{ThisRowReaderAsFactory, RowReader} +import com.datastax.spark.connector.{CassandraRowMetadata, GettableData, TupleValue, UDTValue} +import com.datastax.spark.connector.rdd.reader.{RowReader, ThisRowReaderAsFactory} import com.datastax.spark.connector.types.TypeConverter - import org.apache.spark.sql.{Row => SparkRow} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.sql.types.Decimal -final class CassandraSQLRow(val columnNames: IndexedSeq[String], val columnValues: IndexedSeq[AnyRef]) +final class CassandraSQLRow(val metaData: CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends GettableData with SparkRow with Serializable { - protected def fieldNames = columnNames + protected def fieldNames = metaData private[spark] def this() = this(null, null) // required by Kryo for deserialization :( @@ -56,19 +55,19 @@ object CassandraSQLRow { lazy val defaultTimeZone = java.util.TimeZone.getDefault def subtractTimeZoneOffset( millis: Long ) = millis - defaultTimeZone.getOffset(millis) - def fromJavaDriverRow(row: Row, columnNames: Array[String]): CassandraSQLRow = { - val data = new Array[Object](columnNames.length) - for (i <- columnNames.indices) { + def fromJavaDriverRow(row: Row, metaData:CassandraRowMetadata): CassandraSQLRow = { + val data = new Array[Object](metaData.columnNames.length) + for (i <- metaData.columnNames.indices) { data(i) = GettableData.get(row, i) data.update(i, toSparkSqlType(data(i))) } - new CassandraSQLRow(columnNames, data) + new CassandraSQLRow(metaData, data) } implicit object CassandraSQLRowReader extends RowReader[CassandraSQLRow] with ThisRowReaderAsFactory[CassandraSQLRow] { - override def read(row: Row, columnNames: Array[String]): CassandraSQLRow = - fromJavaDriverRow(row, columnNames) + override def read(row: Row, metaData:CassandraRowMetadata): CassandraSQLRow = + fromJavaDriverRow(row, metaData) override def neededColumns = None override def targetClass = classOf[CassandraSQLRow] From 8ea88dd7352ac552642aea606506550855d1b6a6 Mon Sep 17 00:00:00 2001 From: artemaliev Date: Wed, 1 Jun 2016 12:54:43 +0300 Subject: [PATCH 2/2] SPARKC-383: add the optimization to DataFrame CassandraSQLRow Move common code to the CassandraRow.dataFromJavaDriverRow method --- .../spark/connector/CassandraRow.scala | 7 ++++-- .../spark/connector/japi/CassandraRow.scala | 25 ++----------------- .../spark/sql/cassandra/CassandraSQLRow.scala | 10 +++----- 3 files changed, 10 insertions(+), 32 deletions(-) diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala index 0b84f3bea..a67ab411a 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/CassandraRow.scala @@ -154,6 +154,10 @@ object CassandraRow { * the input `Row` in order to improve performance. Fetching column values by name is much * slower than fetching by index. */ def fromJavaDriverRow(row: Row, metaData: CassandraRowMetadata): CassandraRow = { + new CassandraRow(metaData, CassandraRow.dataFromJavaDriverRow(row, metaData)) + } + + def dataFromJavaDriverRow(row: Row, metaData: CassandraRowMetadata): Array[Object] = { val length = metaData.columnNames.length var i = 0 val data = new Array[Object](length) @@ -175,10 +179,9 @@ object CassandraRow { i += 1 } } - new CassandraRow(metaData, data) + data } - /** Creates a CassandraRow object from a map with keys denoting column names and * values denoting column values. */ def fromMap(map: Map[String, Any]): CassandraRow = { diff --git a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala index eba95e80b..81bdc84c9 100644 --- a/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala +++ b/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/japi/CassandraRow.scala @@ -1,7 +1,7 @@ package com.datastax.spark.connector.japi import com.datastax.driver.core.Row -import com.datastax.spark.connector.{CassandraRowMetadata, GettableData} +import com.datastax.spark.connector.CassandraRowMetadata final class CassandraRow(val metaData:CassandraRowMetadata, val columnValues: IndexedSeq[AnyRef]) extends JavaGettableData with Serializable { @@ -35,28 +35,7 @@ object CassandraRow { * the input `Row` in order to improve performance. Fetching column values by name is much * slower than fetching by index. */ def fromJavaDriverRow(row: Row, metaData: CassandraRowMetadata): CassandraRow = { - val length = metaData.columnNames.length - var i = 0 - val data = new Array[Object](length) - - // Here we use a mutable while loop for performance reasons, scala for loops are - // converted into range.foreach() and the JVM is unable to inline the foreach closure. - // 'match' is replaced with 'if' for the same reason. - // It is also out of the loop for performance. - if (metaData.codecs == null) { - //that should not happen in production, but just in case - while (i < length) { - data(i) = GettableData.get(row, i) - i += 1 - } - } - else { - while (i < length) { - data(i) = GettableData.get(row, i, metaData.codecs(i)) - i += 1 - } - } - new CassandraRow(metaData, data) + new CassandraRow(metaData, com.datastax.spark.connector.CassandraRow.dataFromJavaDriverRow(row, metaData)) } /** Creates a CassandraRow object from a map with keys denoting column names and * values denoting column values. */ diff --git a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala index 864a717e4..5ea743956 100644 --- a/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala +++ b/spark-cassandra-connector/src/main/scala/org/apache/spark/sql/cassandra/CassandraSQLRow.scala @@ -7,7 +7,7 @@ import java.math.BigInteger import com.datastax.driver.core.Row import com.datastax.driver.core.LocalDate -import com.datastax.spark.connector.{CassandraRowMetadata, GettableData, TupleValue, UDTValue} +import com.datastax.spark.connector.{CassandraRow, CassandraRowMetadata, GettableData, TupleValue, UDTValue} import com.datastax.spark.connector.rdd.reader.{RowReader, ThisRowReaderAsFactory} import com.datastax.spark.connector.types.TypeConverter import org.apache.spark.sql.{Row => SparkRow} @@ -56,12 +56,8 @@ object CassandraSQLRow { def subtractTimeZoneOffset( millis: Long ) = millis - defaultTimeZone.getOffset(millis) def fromJavaDriverRow(row: Row, metaData:CassandraRowMetadata): CassandraSQLRow = { - val data = new Array[Object](metaData.columnNames.length) - for (i <- metaData.columnNames.indices) { - data(i) = GettableData.get(row, i) - data.update(i, toSparkSqlType(data(i))) - } - new CassandraSQLRow(metaData, data) + val data = CassandraRow.dataFromJavaDriverRow(row, metaData) + new CassandraSQLRow(metaData, data.map(toSparkSqlType)) } implicit object CassandraSQLRowReader extends RowReader[CassandraSQLRow] with ThisRowReaderAsFactory[CassandraSQLRow] {