Skip to content
rxin edited this page Feb 5, 2013 · 46 revisions

Setup

Launch AMI

~/Documents/shark/mesos/trunk/ec2/mesos-ec2 -k rxin-us-east \
-i ~/.ec2/rxin-us-east.pem -s 100 --ami ami-fef35597 -t m2.2xlarge launch shark-sigmod

Update Shark/Spark (maybe optional)

Setup AWS credentials for s3n in ephemeral-hdfs/conf/core-site.xml

<property>
    <name>fs.s3n.awsAccessKeyId</name>
    <value>ID</value>
</property>

<property>
    <name>fs.s3n.awsSecretAccessKey</name>
    <value>SECRET</value>
</property>

Start MapReduce job tracker

/root/ephemeral-hdfs/bin/start-mapred.sh

Copy files over from s3

ephemeral-hdfs/bin/hadoop distcp s3n://wiki-traffic/pagecounts hdfs://`hostname`:9000/wiki/pagecounts
ephemeral-hdfs/bin/hadoop distcp s3n://wiki-traffic/wiki-dump hdfs://`hostname`:9000/wiki/dump

Set ulimit

ulimit -n 1000000

run the renaming script and load all tables into Hive

cp /root/ephemeral-hdfs/conf/core-site.xml /root/spark/conf/core-site.xml

run the file renaming script in Spark: https://gist.github.com/2854326

run the generated hive.q file in Shark

/root/shark/bin/shark-withinfo -f "/root/spark/hive.q"

Setup

Prep data

create table wiki_filtered as select * from wikistats where (dt like '200902%' or dt like '200901%') and project_code = 'en' and not page_name like 'Special:%';
create external table text_with_title (title string, body string) row format delimited fields terminated by '\t' location '/wiki/dump/text_with_title/';

Cache data

create table wiki_cached as select * from wiki_filtered;
create table page_cached as select * from text_with_title;
create table top_pages_cached as select page_name, sum(page_views) s from wiki_cached where dt='20090214' group by page_name order by s desc limit 100;

Queries

Some configs:

set mapred.job.reuse.jvm.num.tasks=-1;
set hive.merge.mapfiles=false;
set mapred.reduce.tasks=399;
set hive.map.aggr=false;

Query 1: Top Pages on Valentine's Day (20 secs)

set mapred.reduce.tasks=399;
select page_name, sum(page_views) s from wiki_cached where dt='20090214' group by page_name order by s desc limit 20;

Query 2: Hits on pages related to Valentine's day (6 secs)

set mapred.reduce.tasks=9;
select page_name, sum(page_views) s from wiki_cached where page_name like '%Valentine%' group by page_name order by s desc limit 10;

Query 3. Histogram (6 secs)

set mapred.reduce.tasks=11;
select dt, sum(page_views) s from wiki_cached where page_name like 'Valentine%' group by dt order by dt limit 100;

Query 4. Join (20 secs)

set mapred.reduce.tasks=399;
create table top_pages_with_body_cached as select * from page_cached p join top_pages_cached t on p.title=t.page_name;

In shark-shell

import spark.examples.TfIdf
val inputPages = sql2rdd("select title, body, s from top_pages_with_body order by s desc limit 100")
val docs = inputPages.mapRows { r => (r.getString(0), r.getString(1).replaceAll("\\n"," ").split("\\W").filter(_ != "").mkString(" ").toLowerCase) }.cache
val docVecs = TfIdf.termVectors(docs, 1000, 200).cache()
val seqs = TfIdf.kmeans(docVecs, 10, 20)
for ((seq, i) <- seqs.zipWithIndex) { println("Cluster " + i + ": " + seq.take(8).mkString(", ")) }