Skip to content

Commit

Permalink
stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
havron committed Dec 14, 2016
1 parent c9de69a commit 69f22c7
Show file tree
Hide file tree
Showing 5 changed files with 119 additions and 6 deletions.
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
.PHONY: github run dev database clean shell
.PHONY: github run dev database clean shell spark
MSG=small edit
dev:
sudo docker-compose down
sudo docker-compose build
sudo docker-compose up

spark:
sudo docker-compose -f spark-compose.yml down
sudo docker-compose -f spark-compose.yml up

github:
git add -A
git commit -m "${MSG}"
Expand Down
4 changes: 2 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ models1: # PORT 8021
- ./models/:/app
ports:
- "8021:8000"
command: bash -c "sleep 25 && pip install -r requirements.txt && python manage.py loaddata db.json && mod_wsgi-express start-server --reload-on-changes ./models/wsgi.py"
command: bash -c "sleep 45 && pip install -r requirements.txt && python manage.py loaddata db.json && mod_wsgi-express start-server --reload-on-changes ./models/wsgi.py"
container_name: models1
# sleep to wait for models0 to spin up and migrate to db
# could use `wait-for-it` instead...
Expand All @@ -31,7 +31,7 @@ models2: # PORT 8022
- ./models/:/app
ports:
- "8022:8000"
command: bash -c "sleep 25 && pip install -r requirements.txt && python manage.py loaddata db.json && mod_wsgi-express start-server --reload-on-changes ./models/wsgi.py"
command: bash -c "sleep 45 && pip install -r requirements.txt && python manage.py loaddata db.json && mod_wsgi-express start-server --reload-on-changes ./models/wsgi.py"
container_name: models2
# sleep to wait for models0 to spin up and migrate to db
# could use `wait-for-it` instead...
Expand Down
8 changes: 7 additions & 1 deletion recs/access.log
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
tp 4
bob 5
tp 4
hank 3
hank 3
tp 3
tp 5
tp 9
tp 20
hank 20
hank 9
57 changes: 55 additions & 2 deletions recs/spark.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,70 @@
from pyspark import SparkContext
import itertools

sc = SparkContext("spark://spark-master:7077", "PopularItems")

data = sc.textFile("/tmp/data/access.log", 2) # each worker loads a piece of the data file

pairs = data.map(lambda line: line.split("\t")) # tell each worker to split each line of it's partition
pairs = data.map(lambda line: tuple(line.split("\t"))) # tell each worker to split each line of it's partition
pages = pairs.map(lambda pair: (pair[1], 1)) # re-layout the data to ignore the user id
count = pages.reduceByKey(lambda x,y: x+y) # shuffle the data so that each key is only on one worker
# and then reduce all the values by adding them together

print(pairs.collect())
print(pages.collect())


output = count.collect() # bring the data back to the master node so we can print it out
for page_id, count in output:
print ("page_id %s count %d" % (page_id, count))
print ("Popular items done")

sc.stop()
# Group data into [(user_id, [items clicked on])]
clicks = pairs.groupByKey()
clickpairs = clicks.map(lambda click: (click[0],itertools.combinations(click[1],2)))

for click in clicks.collect():
print(click[0]+" clicked on the following items: "+str(list(click[1])))
print(str(list(itertools.combinations(click[1], 2))))




clicks = pairs.distinct().groupByKey()
clickpairs = clicks.map(lambda click: (click[0], list(itertools.combinations(click[1],2))))

print("Click pairs: "+ str(clickpairs.collect()))


cl = clickpairs.collect()

for c in cl:
print("mapping is "+str((c[0], c[1])))
print("NEW mapping is "+str(list((ca, c[0]) for ca in c[1])))


# coclicks = clickpairs.map(lambda pair: list((pa, pair[0]) for pa in pairs[1]))
coclicks = clickpairs.flatMap(lambda pair: list((pair[0], pair[1])))

p = coclicks.map(lambda pair: (pair[1],1))
p2 = p.reduceByKey(lambda x, y: x + y)

p3 = p2.filter(lambda pair: pair[1] >= 3)

print("CO-CLICKS: "+str(coclicks.collect()))
print("p1: "+str(p.collect()))
print("p2: "+str(p2.collect()))
print("p3: "+str(p3.collect()))



# Transform into (user_id, (item1, item2) where item1 and item2 are pairs of items the user clicked on

# Transform into ((item1, item2), list of user1, user2 etc) where users are all the ones who co-clicked (item1, item2)

# Transform into ((item1, item2), count of distinct users who co-clicked (item1, item2)


# Filter out any results where less than 3 users co-clicked the same pair of items

sc.stop()
50 changes: 50 additions & 0 deletions spark-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
###### SPARK CONTAINERS ######
spark-master:
image: gettyimages/spark
command: bin/spark-class org.apache.spark.deploy.master.Master -h spark-master
container_name: spark-master
hostname: spark-master
environment:
MASTER: spark://spark-master:7077
SPARK_CONF_DIR: /conf
expose:
- 7001
- 7002
- 7003
- 7004
- 7005
- 7006
- 7077
- 6066
ports:
- 4040:4040
- 6066:6066
- 7077:7077
- 8080:8080
volumes:
- ./recs:/tmp/data

spark-worker:
image: gettyimages/spark
command: bin/spark-class org.apache.spark.deploy.worker.Worker spark://spark-master:7077
container_name: spark-worker
hostname: spark-worker
environment:
SPARK_CONF_DIR: /conf
SPARK_WORKER_CORES: 2
SPARK_WORKER_MEMORY: 512m
SPARK_WORKER_PORT: 8881
SPARK_WORKER_WEBUI_PORT: 8081
links:
- spark-master
expose:
- 7012
- 7013
- 7014
- 7015
- 7016
- 8881
ports:
- 8081:8081
volumes:
- ./recs:/tmp/data

0 comments on commit 69f22c7

Please sign in to comment.