From eda79ab528ca343e33fbdb9da9bb313f3e484ecc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 27 Jan 2014 19:34:05 -0800 Subject: [PATCH] Merge pull request #253 from harveyfeng/tablescan Correctly detect in-memory RDDs for table scans. --- .../shark/execution/TableScanOperator.scala | 19 ++++++++++--------- src/test/scala/shark/SQLSuite.scala | 4 ++++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/src/main/scala/shark/execution/TableScanOperator.scala b/src/main/scala/shark/execution/TableScanOperator.scala index c5684ca2..eaba7e9b 100755 --- a/src/main/scala/shark/execution/TableScanOperator.scala +++ b/src/main/scala/shark/execution/TableScanOperator.scala @@ -37,7 +37,9 @@ import org.apache.spark.rdd.{PartitionPruningRDD, RDD} import shark.{LogHelper, SharkConfVars, SharkEnv} import shark.execution.optimization.ColumnPruner -import shark.memstore2.{CacheType, ColumnarSerDe, MemoryMetadataManager} +import shark.memstore2.CacheType +import shark.memstore2.CacheType._ +import shark.memstore2.{ColumnarSerDe, MemoryMetadataManager} import shark.memstore2.{TablePartition, TablePartitionStats} import shark.util.HiveUtils @@ -70,22 +72,25 @@ class TableScanOperator extends TopOperator[TableScanDesc] { @BeanProperty var tableDesc: TableDesc = _ + // True if table data is stored the Spark heap. @BeanProperty var isInMemoryTableScan: Boolean = _ + @BeanProperty var cacheMode: CacheType.CacheType = _ + override def initializeOnMaster() { // Create a local copy of the HiveConf that will be assigned job properties and, for disk reads, // broadcasted to slaves. localHConf = new HiveConf(super.hconf) + cacheMode = CacheType.fromString( + tableDesc.getProperties().get("shark.cache").asInstanceOf[String]) isInMemoryTableScan = SharkEnv.memoryMetadataManager.containsTable( table.getDbName, table.getTableName) } override def outputObjectInspector() = { - val cacheMode = CacheType.fromString( - tableDesc.getProperties().get("shark.cache").asInstanceOf[String]) if (parts == null) { - val serializer = if (CacheType.shouldCache(cacheMode)) { + val serializer = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) { new ColumnarSerDe } else { tableDesc.getDeserializerClass().newInstance() @@ -94,7 +99,7 @@ class TableScanOperator extends TopOperator[TableScanDesc] { serializer.getObjectInspector() } else { val partProps = firstConfPartDesc.getProperties() - val partSerDe = if (CacheType.shouldCache(cacheMode)) { + val partSerDe = if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) { new ColumnarSerDe } else { firstConfPartDesc.getDeserializerClass().newInstance() @@ -115,8 +120,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] { // 1. Spark heap (block manager), accessed through the Shark MemoryMetadataManager // 2. Tachyon table // 3. Hive table on HDFS (or other Hadoop storage) - val cacheMode = CacheType.fromString( - tableDesc.getProperties().get("shark.cache").asInstanceOf[String]) // TODO(harvey): Pruning Hive-partitioned, cached tables isn't supported yet. if (isInMemoryTableScan || cacheMode == CacheType.TACHYON) { if (isInMemoryTableScan) { @@ -147,8 +150,6 @@ class TableScanOperator extends TopOperator[TableScanDesc] { // the input table and we have statistics on the table. val columnsUsed = new ColumnPruner(this, table).columnsUsed - val cacheMode = CacheType.fromString( - tableDesc.getProperties().get("shark.cache").asInstanceOf[String]) if (!table.isPartitioned && cacheMode == CacheType.TACHYON) { SharkEnv.tachyonUtil.pushDownColumnPruning(rdd, columnsUsed) } diff --git a/src/test/scala/shark/SQLSuite.scala b/src/test/scala/shark/SQLSuite.scala index 55d7ce0c..01720050 100644 --- a/src/test/scala/shark/SQLSuite.scala +++ b/src/test/scala/shark/SQLSuite.scala @@ -1042,6 +1042,10 @@ class SQLSuite extends FunSuite { val cachedCount = cachedTableCounts(i) assert(onDiskCount == cachedCount, """Num rows for %s differ across Shark metastore restart. (rows cached = %s, rows on disk = %s)""".format(tableName, cachedCount, onDiskCount)) + // Check that we're able to materialize a row - i.e., make sure that table scan operator + // doesn't try to use a ColumnarSerDe when scanning contents on disk (for our test tables, + // LazySimpleSerDes should be used). + sc.sql("select * from %s limit 1".format(tableName)) } // Finally, reload all tables. SharkRunner.loadTables()