layout | title | date | author | tags | modified_time | ||
---|---|---|---|---|---|---|---|
post |
A Lap around Apache Spark 1.3.1 with HDP 2.3 |
2014-06-24T12:34:00.001-07:00 |
Saptak Sen |
|
2014-06-24T15:11:18.054-07:00 |
This Apache Spark 1.3.1 with HDP 2.3 guide walks you through many of the newer features of Apache Spark 1.3.1 on YARN.
Hortonworks recently announced general availability of Spark 1.3.1 on the HDP platform. Apache Spark is a fast moving community and Hortonworks plans frequent releases to allow evaluation and production use of the latest capabilities of Apache Spark on HDP for our customers.
With YARN, Hadoop can now support many types of data and application workloads; Spark on YARN becomes yet another workload running against the same set of hardware resources.
This guide describes how to:
- Run Spark on YARN and run the canonical Spark examples: SparkPi and Wordcount.
- Run Spark 1.3.1 on HDP 2.3.
- Use Spark DataFrame API
- Work with a built-in UDF, collect_list, a key feature of Hive 13. This release provides support for Hive 0.13.1 and instructions on how to call this UDF from Spark shell.
- Use SparkSQL thrift JDBC/ODBC Server.
- View history of finished jobs with Spark Job History.
- Use ORC files with Spark, with examples.
When you are ready to go beyond these tasks, try the machine learning examples at Apache Spark.
Spark 1.3.1 can be configured on any HDP 2.3 cluster whether it is a multi node cluster or a single node HDP Sandbox.
The instructions in this guide assumes you are using the latest Hortonworks Sandbox
To test compute intensive tasks in Spark, the Pi example calculates pi by “throwing darts” at a circle. The example points in the unit square ((0,0) to (1,1)) and sees how many fall in the unit circle. The fraction should be pi/4, which is used to estimate Pi.
To calculate Pi with Spark:
- Change to your Spark directory and become spark OS user:
cd /usr/hdp/current/spark-client
su spark
- Run the Spark Pi example in yarn-client mode:
./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client --num-executors 3 --driver-memory 512m --executor-memory 512m --executor-cores 1 lib/spark-examples*.jar 10
Note: The Pi job should complete without any failure messages and produce output similar to below, note the value of Pi in the output message:
Upload the input file you want to use in WordCount to HDFS. You can use any text file as input. In the following example, log4j.properties is used as an example:
As user spark:
hadoop fs -copyFromLocal /etc/hadoop/conf/log4j.properties /tmp/data
To run WordCount:
./bin/spark-shell --master yarn-client --driver-memory 512m --executor-memory 512m
Output similar to below displays before the Scala REPL prompt, scala>:
val file = sc.textFile("/tmp/data")
val counts = file.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)
counts.saveAsTextFile("/tmp/wordcount")
To view the output in the scala shell:
counts.count()
To print the full output of the WordCount job:
counts.toArray().foreach(println)
To read the output of WordCount using HDFS command: Exit the scala shell.
exit
View WordCount Results:
hadoop fs -ls /tmp/wordcount
It should display output similar to:
Use the HDFS cat command to see the WordCount output. For example,
hadoop fs -cat /tmp/wordcount/part-00000
With Spark 1.3.1, DataFrame API is a new feature. DataFrame API provide easier access to data since it looks conceptually like a Table and a lot of developers from Python/R/Pandas are familiar with it.
Let's upload people text file to HDFS
cd /usr/hdp/current/spark-client
su spark
hdfs dfs -copyFromLocal examples/src/main/resources/people.txt people.txt
hdfs dfs -copyFromLocal examples/src/main/resources/people.json people.json
Then let's launch the Spark Shell
cd /usr/hdp/current/spark-client
su spark
./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
At the Spark Shell type the following:
val df = sqlContext.jsonFile("people.json")
This will produce and output such as
Note: The highlighted output shows the inferred schema of the underlying people.json.
Now print the content of DataFrame with df.show
df.show
import org.apache.spark.sql.functions._
// Select everybody, but increment the age by 1
df.select(df("name"), df("age") + 1).show()
// Select people older than 21
df.filter(df("age") > 21).show()
// Count people by age
df.groupBy("age").count().show()
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val people = sc.textFile("people.txt")
val schemaString = "name age"
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
peopleDataFrame.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)
This will produce an output like
Hive 0.13.1 provides a new built-in UDF collect_list(col) which returns a list of objects with duplicates. The below example reads and write to HDFS under Hive directories. In a production environment one needs appropriate HDFS permission. However for evaluation you can run all this section as hdfs user.
Before running Hive examples run the following steps:
su hdfs
./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
You should see output similar to the following:
hiveContext.sql("CREATE TABLE IF NOT EXISTS TestTable (key INT, value STRING)")
You should see output similar to the following:
scala> hiveContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE TestTable")
You should see output similar to the following:
hiveContext.sql("from TestTable SELECT key, collect_list(value) group by key order by key").collect.foreach(println)
You should see output similar to the following:
In this tech preview, we have implemented full support for ORC files with Spark. We will walk through an example that reads and write ORC file and uses ORC structure to infer a table.
hiveContext.sql("create table orc_table(key INT, value STRING) stored as orc")
hiveContext.sql("INSERT INTO table orc_table select * from testtable")
hiveContext.sql("FROM orc_table SELECT *").collect().foreach(println)
val inputRead = sc.hadoopFile("/apps/hive/warehouse/orc_table", classOf[org.apache.hadoop.hive.ql.io.orc.OrcInputFormat],classOf[org.apache.hadoop.io.NullWritable],classOf[org.apache.hadoop.hive.ql.io.orc.OrcStruct])
val k = inputRead.map(pair => pair._2.toString)
val c = k.collect
You should see output similar to the following:
cd /usr/hdp/current/spark-client
su spark
hadoop dfs -put examples/src/main/resources/people.txt people.txt
./bin/spark-shell --num-executors 2 --executor-memory 512m --master yarn-client
on Scala prompt type the following, except for the comments
import org.apache.spark.sql.hive.orc._
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StructType,StructField,StringType}
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)
Load and register the spark table
val people = sc.textFile("people.txt")
val schemaString = "name age"
val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
val rowRDD = people.map(_.split(",")).map(p => Row(p(0), new Integer(p(1).trim)))
Infer table schema from RDD
val peopleSchemaRDD = hiveContext.applySchema(rowRDD, schema)
Create a table from schema
peopleSchemaRDD.registerTempTable("people")
val results = hiveContext.sql("SELECT * FROM people")
results.map(t => "Name: " + t.toString).collect().foreach(println)
Save Table to ORCFile
peopleSchemaRDD.saveAsOrcFile("people.orc")
Create Table from ORCFile
val morePeople = hiveContext.orcFile("people.orc")
morePeople.registerTempTable("morePeople")
Query from the table
hiveContext.sql("SELECT * from morePeople").collect.foreach(println)
With this release SparkSQL’s thrift server provides JDBC access to SparkSQL.
- Start Thrift Server From SPARK_HOME, start SparkSQL thrift server, Note the port value of the thrift JDBC server
su spark
./sbin/start-thriftserver.sh --master yarn-client --executor-memory 512m --hiveconf hive.server2.thrift.port=10001
- Connect to Thrift Server over beeline Launch beeline from SPARK_HOME
su spark
./bin/beeline
- Connect to Thrift Server & Issue SQL commands On beeline prompt
!connect jdbc:hive2://localhost:10001
Note this is example is without security enabled, so any username password should work.
Note, the connection may take a few second to be available and try show tables after a wait of 10-15 second in a Sandbox env.
show tables;
type Ctrl+C
to exit beeline.
- Stop Thrift Server
./sbin/stop-thriftserver.sh
Spark Job history server is integrated with YARN’s Application Timeline Server(ATS) and publishes job metrics to ATS. This allows job details to be available after the job finishes.
- Start Spark History Server
./sbin/start-history-server.sh
You can let the history server run, while you run examples and go to YARN resource manager page at http://127.0.0.1:8088/cluster/apps and see the logs of finished application with the history server.
- Stop Spark History Server
./sbin/stop-history-server.sh
Visit http://hortonworks.com/tutorials for more tutorials on Apache Spark.