Skip to content

Commit

Permalink
Merge pull request #253 from harveyfeng/tablescan
Browse files Browse the repository at this point in the history
Correctly detect in-memory RDDs for table scans.
  • Loading branch information
rxin authored and harveyfeng committed Jan 28, 2014
1 parent 5c271bb commit eda79ab
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
19 changes: 10 additions & 9 deletions src/main/scala/shark/execution/TableScanOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD}

import shark.{LogHelper, SharkConfVars, SharkEnv}
import shark.execution.optimization.ColumnPruner
import shark.memstore2.{CacheType, ColumnarSerDe, MemoryMetadataManager}
import shark.memstore2.CacheType
import shark.memstore2.CacheType._
import shark.memstore2.{ColumnarSerDe, MemoryMetadataManager}
import shark.memstore2.{TablePartition, TablePartitionStats}
import shark.util.HiveUtils

Expand Down Expand Up @@ -70,22 +72,25 @@ class TableScanOperator extends TopOperator[TableScanDesc] {

@BeanProperty var tableDesc: TableDesc = _

// True if table data is stored the Spark heap.
@BeanProperty var isInMemoryTableScan: Boolean = _

@BeanProperty var cacheMode: CacheType.CacheType = _


override def initializeOnMaster() {
// Create a local copy of the HiveConf that will be assigned job properties and, for disk reads,
// broadcasted to slaves.
localHConf = new HiveConf(super.hconf)
cacheMode = CacheType.fromString(
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
isInMemoryTableScan = SharkEnv.memoryMetadataManager.containsTable(
table.getDbName, table.getTableName)
}

override def outputObjectInspector() = {
val cacheMode = CacheType.fromString(
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
if (parts == null) {
val serializer = if (CacheType.shouldCache(cacheMode)) {
val serializer = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
new ColumnarSerDe
} else {
tableDesc.getDeserializerClass().newInstance()
Expand All @@ -94,7 +99,7 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
serializer.getObjectInspector()
} else {
val partProps = firstConfPartDesc.getProperties()
val partSerDe = if (CacheType.shouldCache(cacheMode)) {
val partSerDe = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
new ColumnarSerDe
} else {
firstConfPartDesc.getDeserializerClass().newInstance()
Expand All @@ -115,8 +120,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
// 1. Spark heap (block manager), accessed through the Shark MemoryMetadataManager
// 2. Tachyon table
// 3. Hive table on HDFS (or other Hadoop storage)
val cacheMode = CacheType.fromString(
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
// TODO(harvey): Pruning Hive-partitioned, cached tables isn't supported yet.
if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) {
if (isInMemoryTableScan) {
Expand Down Expand Up @@ -147,8 +150,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] {
// the input table and we have statistics on the table.
val columnsUsed = new ColumnPruner(this, table).columnsUsed

val cacheMode = CacheType.fromString(
tableDesc.getProperties().get("shark.cache").asInstanceOf[String])
if (!table.isPartitioned && cacheMode == CacheType.TACHYON) {
SharkEnv.tachyonUtil.pushDownColumnPruning(rdd, columnsUsed)
}
Expand Down
4 changes: 4 additions & 0 deletions src/test/scala/shark/SQLSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,10 @@ class SQLSuite extends FunSuite {
val cachedCount = cachedTableCounts(i)
assert(onDiskCount == cachedCount, """Num rows for %s differ across Shark metastore restart.
(rows cached = %s, rows on disk = %s)""".format(tableName, cachedCount, onDiskCount))
// Check that we're able to materialize a row - i.e., make sure that table scan operator
// doesn't try to use a ColumnarSerDe when scanning contents on disk (for our test tables,
// LazySimpleSerDes should be used).
sc.sql("select * from %s limit 1".format(tableName))
}
// Finally, reload all tables.
SharkRunner.loadTables()
Expand Down

0 comments on commit eda79ab

Please sign in to comment.