diff --git a/README.md b/README.md index 6fc1561..ced9b83 100755 --- a/README.md +++ b/README.md @@ -55,6 +55,7 @@ When reading files the API accepts several options: * `charset`: defaults to 'UTF-8' but can be set to other valid charset names * `inferSchema`: automatically infers column types. It requires one extra pass over the data and is false by default * `comment`: skip lines beginning with this character. Default is `"#"`. Disable comments by setting this to `null`. +* `codec`: compression codec to use when saving to file. Should be the fully qualified name of a class implementing `org.apache.hadoop.io.compress.CompressionCodec`. Defaults to no compression when a codec is not specified. The package also support saving simple (non-nested) DataFrame. When saving you can specify the delimiter and whether we should generate a header row for the table. See following examples for more details. @@ -127,6 +128,24 @@ selectedData.write .save("newcars.csv") ``` +You can save with compressed output: +```scala +import org.apache.spark.sql.SQLContext + +val sqlContext = new SQLContext(sc) +val df = sqlContext.read + .format("com.databricks.spark.csv") + .option("header", "true") // Use first line of all files as header + .option("inferSchema", "true") // Automatically infer data types + .load("cars.csv") + +val selectedData = df.select("year", "model") +selectedData.write + .format("com.databricks.spark.csv") + .option("header", "true") + .option("codec", "org.apache.hadoop.io.compress.GzipCodec") + .save("newcars.csv.gz") +``` __Spark 1.3:__ @@ -209,7 +228,23 @@ df.select("year", "model").write() .save("newcars.csv"); ``` +You can save with compressed output: +```java +import org.apache.spark.sql.SQLContext +SQLContext sqlContext = new SQLContext(sc); +DataFrame df = sqlContext.read() + .format("com.databricks.spark.csv") + .option("inferSchema", "true") + .option("header", "true") + .load("cars.csv"); + +df.select("year", "model").write() + .format("com.databricks.spark.csv") + .option("header", "true") + .option("codec", "org.apache.hadoop.io.compress.GzipCodec") + .save("newcars.csv"); +``` __Spark 1.3:__ @@ -235,7 +270,7 @@ import org.apache.spark.sql.types.*; SQLContext sqlContext = new SQLContext(sc); StructType customSchema = new StructType( - new StructField("year", IntegerType, true), + new StructField("year", IntegerType, true), new StructField("make", StringType, true), new StructField("model", StringType, true), new StructField("comment", StringType, true), @@ -250,6 +285,29 @@ DataFrame df = sqlContext.load("com.databricks.spark.csv", customSchema, options df.select("year", "model").save("newcars.csv", "com.databricks.spark.csv"); ``` +You can save with compressed output: +```java +import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SaveMode; + +SQLContext sqlContext = new SQLContext(sc); + +HashMap options = new HashMap(); +options.put("header", "true"); +options.put("path", "cars.csv"); +options.put("inferSchema", "true"); + +DataFrame df = sqlContext.load("com.databricks.spark.csv", options); + +HashMap saveOptions = new HashMap(); +saveOptions.put("header", "true"); +saveOptions.put("path", "newcars.csv"); +saveOptions.put("codec", "org.apache.hadoop.io.compress.GzipCodec"); + +df.select("year", "model").save("com.databricks.spark.csv", SaveMode.Overwrite, + saveOptions); +``` + ### Python API __Spark 1.4+:__ @@ -286,6 +344,14 @@ df.select('year', 'model').write \ .save('newcars.csv') ``` +You can save with compressed output: +```python +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv') +df.select('year', 'model').write.format('com.databricks.spark.csv').options(codec="org.apache.hadoop.io.compress.GzipCodec").save('newcars.csv') +``` __Spark 1.3:__ @@ -315,6 +381,15 @@ df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', schema df.select('year', 'model').save('newcars.csv', 'com.databricks.spark.csv') ``` +You can save with compressed output: +```python +from pyspark.sql import SQLContext +sqlContext = SQLContext(sc) + +df = sqlContext.load(source="com.databricks.spark.csv", header = 'true', inferSchema = 'true', path = 'cars.csv') +df.select('year', 'model').save('newcars.csv', 'com.databricks.spark.csv', codec="org.apache.hadoop.io.compress.GzipCodec") +``` + ### R API __Spark 1.4+:__ @@ -337,7 +412,7 @@ library(SparkR) Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.3.0" "sparkr-shell"') sqlContext <- sparkRSQL.init(sc) customSchema <- structType( - structField("year", "integer"), + structField("year", "integer"), structField("make", "string"), structField("model", "string"), structField("comment", "string"), @@ -348,5 +423,17 @@ df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", schem write.df(df, "newcars.csv", "com.databricks.spark.csv", "overwrite") ``` +You can save with compressed output: +```R +library(SparkR) + +Sys.setenv('SPARKR_SUBMIT_ARGS'='"--packages" "com.databricks:spark-csv_2.10:1.2.0" "sparkr-shell"') +sqlContext <- sparkRSQL.init(sc) + +df <- read.df(sqlContext, "cars.csv", source = "com.databricks.spark.csv", schema = customSchema, inferSchema = "true") + +write.df(df, "newcars.csv", "com.databricks.spark.csv", "overwrite", codec="org.apache.hadoop.io.compress.GzipCodec") +``` + ## Building From Source This library is built with [SBT](http://www.scala-sbt.org/0.13/docs/Command-Line-Reference.html), which is automatically downloaded by the included shell script. To build a JAR file simply run `sbt/sbt package` from the project root. The build configuration includes support for both Scala 2.10 and 2.11. diff --git a/src/main/scala/com/databricks/spark/csv/CsvParser.scala b/src/main/scala/com/databricks/spark/csv/CsvParser.scala index b19244b..0a2f914 100644 --- a/src/main/scala/com/databricks/spark/csv/CsvParser.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvParser.scala @@ -39,6 +39,7 @@ class CsvParser extends Serializable { private var parserLib: String = ParserLibs.DEFAULT private var charset: String = TextFile.DEFAULT_CHARSET.name() private var inferSchema: Boolean = false + private var codec: String = null def withUseHeader(flag: Boolean): CsvParser = { this.useHeader = flag @@ -105,6 +106,11 @@ class CsvParser extends Serializable { this } + def withCompression(codec: String): CsvParser = { + this.codec = codec + this + } + /** Returns a Schema RDD for the given CSV path. */ @throws[RuntimeException] def csvFile(sqlContext: SQLContext, path: String): DataFrame = { @@ -122,7 +128,8 @@ class CsvParser extends Serializable { ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls, schema, - inferSchema)(sqlContext) + inferSchema, + codec)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } @@ -141,7 +148,8 @@ class CsvParser extends Serializable { ignoreTrailingWhiteSpace, treatEmptyValuesAsNulls, schema, - inferSchema)(sqlContext) + inferSchema, + codec)(sqlContext) sqlContext.baseRelationToDataFrame(relation) } } diff --git a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala index c5031ac..196734d 100755 --- a/src/main/scala/com/databricks/spark/csv/CsvRelation.scala +++ b/src/main/scala/com/databricks/spark/csv/CsvRelation.scala @@ -45,7 +45,8 @@ case class CsvRelation protected[spark] ( ignoreTrailingWhiteSpace: Boolean, treatEmptyValuesAsNulls: Boolean, userSchema: StructType = null, - inferCsvSchema: Boolean)(@transient val sqlContext: SQLContext) + inferCsvSchema: Boolean, + codec: String = null)(@transient val sqlContext: SQLContext) extends BaseRelation with TableScan with PrunedScan with InsertableRelation { /** @@ -314,7 +315,10 @@ case class CsvRelation protected[spark] ( + s" to INSERT OVERWRITE a CSV table:\n${e.toString}") } // Write the data. We assume that schema isn't changed, and we won't update it. - data.saveAsCsvFile(filesystemPath.toString, Map("delimiter" -> delimiter.toString)) + + val codecClass = compresionCodecClass(codec) + data.saveAsCsvFile(filesystemPath.toString, Map("delimiter" -> delimiter.toString), + codecClass) } else { sys.error("CSV tables only support INSERT OVERWRITE for now.") } diff --git a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala index 57f6bfa..13abf04 100755 --- a/src/main/scala/com/databricks/spark/csv/DefaultSource.scala +++ b/src/main/scala/com/databricks/spark/csv/DefaultSource.scala @@ -137,6 +137,8 @@ class DefaultSource throw new Exception("Infer schema flag can be true or false") } + val codec = parameters.getOrElse("codec", null) + CsvRelation( () => TextFile.withCharset(sqlContext.sparkContext, path, charset), Some(path), @@ -151,7 +153,8 @@ class DefaultSource ignoreTrailingWhiteSpaceFlag, treatEmptyValuesAsNullsFlag, schema, - inferSchemaFlag)(sqlContext) + inferSchemaFlag, + codec)(sqlContext) } override def createRelation( @@ -178,7 +181,8 @@ class DefaultSource } if (doSave) { // Only save data when the save mode is not ignore. - data.saveAsCsvFile(path, parameters) + val codecClass = compresionCodecClass(parameters.getOrElse("codec", null)) + data.saveAsCsvFile(path, parameters, codecClass) } createRelation(sqlContext, parameters, data.schema) diff --git a/src/main/scala/com/databricks/spark/csv/package.scala b/src/main/scala/com/databricks/spark/csv/package.scala index 7fc399f..71dab64 100755 --- a/src/main/scala/com/databricks/spark/csv/package.scala +++ b/src/main/scala/com/databricks/spark/csv/package.scala @@ -26,6 +26,16 @@ package object csv { val defaultCsvFormat = CSVFormat.DEFAULT.withRecordSeparator(System.getProperty("line.separator", "\n")) + private[csv] def compresionCodecClass(className: String): Class[_ <: CompressionCodec] = { + className match { + case null => null + case codec => + // scalastyle:off classforname + Class.forName(codec).asInstanceOf[Class[CompressionCodec]] + // scalastyle:on classforname + } + } + /** * Adds a method, `csvFile`, to SQLContext that allows reading CSV data. */ @@ -90,6 +100,8 @@ package object csv { /** * Saves DataFrame as csv files. By default uses ',' as delimiter, and includes header line. + * If compressionCodec is not null the resulting output will be compressed. + * Note that a codec entry in the parameters map will be ignored. */ def saveAsCsvFile(path: String, parameters: Map[String, String] = Map(), compressionCodec: Class[_ <: CompressionCodec] = null): Unit = { diff --git a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala index e4685eb..4f38daa 100755 --- a/src/test/scala/com/databricks/spark/csv/CsvSuite.scala +++ b/src/test/scala/com/databricks/spark/csv/CsvSuite.scala @@ -21,7 +21,7 @@ import java.sql.Timestamp import com.databricks.spark.csv.util.ParseModes import org.apache.hadoop.io.compress.GzipCodec -import org.apache.spark.sql.{SQLContext, Row} +import org.apache.spark.sql.{SQLContext, Row, SaveMode} import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.sql.types._ import org.scalatest.{BeforeAndAfterAll, FunSuite} @@ -431,6 +431,26 @@ abstract class AbstractCsvSuite extends FunSuite with BeforeAndAfterAll { assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) } + test("Scala API save with gzip compression codec") { + // Create temp directory + TestUtils.deleteRecursively(new File(tempEmptyDir)) + new File(tempEmptyDir).mkdirs() + val copyFilePath = tempEmptyDir + "cars-copy.csv" + + val cars = sqlContext.csvFile(carsFile, parserLib = parserLib) + cars.save("com.databricks.spark.csv", SaveMode.Overwrite, + Map("path" -> copyFilePath, "header" -> "true", "codec" -> classOf[GzipCodec].getName)) + val carsCopyPartFile = new File(copyFilePath, "part-00000.gz") + // Check that the part file has a .gz extension + assert(carsCopyPartFile.exists()) + + val carsCopy = sqlContext.csvFile(copyFilePath + "/") + + assert(carsCopy.count == cars.count) + assert(carsCopy.collect.map(_.toString).toSet == cars.collect.map(_.toString).toSet) + } + + test("DSL save with quoting") { // Create temp directory TestUtils.deleteRecursively(new File(tempEmptyDir))