Skip to content
This repository has been archived by the owner on Dec 4, 2024. It is now read-only.

Disparity between CPU allocation and CPU use on Apache spark jobs #74

Open
mooperd opened this issue Oct 18, 2016 · 0 comments
Open

Disparity between CPU allocation and CPU use on Apache spark jobs #74

mooperd opened this issue Oct 18, 2016 · 0 comments

Comments

@mooperd
Copy link

mooperd commented Oct 18, 2016

It seems that the resource allocation for apache spark jobs on DC/OS is a bit murky.

For example launching a job with:

dcos spark run --submit-args='http://foobar/prog.py'

will allocate lots of cluster resources but, when checking top you will only see a couple of processes chewing CPU. In order to get any power out of the job you need to tell the driver how much parallelism to use with spark.default.parallelism

dcos spark run --submit-args='-Dspark.default.parallelism=50 http://foobar/prog.py'

At least to me its a bit opaque what is going on here and the dcos abstraction from spark-submit does not make this any easier.

The pyspark application I am running takes a list and then does the following on it. As the driver does not know what is in the list nor able to infer it from a call to a partition maybe it does not know how many executors to use / start?

job = sc.parallelize(list)
foo = job.flatMap(function)

The program:

import boto3
import ujson
import arrow
from pyspark.sql import SQLContext
from pyspark import SparkContext

s3_list = []
# create a list of files in S3 that we want to process
s3 = boto3.resource('s3', aws_access_key_id='KEY', aws_secret_access_key='KEY')
my_bucket = s3.Bucket('time-waits-for-no-man')
# Prefix='1971' selects 365 files. Prefix='1971-01' selects 31 files. Prefix='1971-01-1' selects 10 days. 
for object in my_bucket.objects.filter(Prefix='1971'):
    s3_list.append(object.key)

def add_timestamp(dict):
    dict['timestamp'] = arrow.get(
                        int(dict['year']),
                        int(dict['month']),
                        int(dict['day']),
                        int(dict['hour']),
                        int(dict['minute']),
                        int(dict['second'])
                        ).timestamp
    return dict

def distributedJsonRead(s3Key):
    s3obj = boto3.resource('s3', aws_access_key_id='KEY', aws_secret_access_key='KEY').Object(bucket_name='time-waits-for-no-man', key=s3Key)
    contents = s3obj.get()['Body'].read().decode()
    meow = contents.splitlines()
    result_wo_timestamp = map(ujson.loads, meow)
    result_wi_timestamp = map(add_timestamp, result_wo_timestamp)
    return result_wi_timestamp

sc = SparkContext()
sqlContext = SQLContext(sc)
job = sc.parallelize(s3_list)
foo = job.flatMap(distributedJsonRead)
df = foo.toDF()
#df.show()
blah = df.count()
print(blah)
df.printSchema()

#df.write.parquet('dates_by_seconds', mode="overwrite", partitionBy=["second"])
sc.stop()
exit()

A sample of the test data in S3. One file for every day in Unix time (1970 - 2038).

{"hour": "00", "month": "01", "second": "00", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "01", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "02", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "03", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "04", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "05", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "06", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "07", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "08", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
{"hour": "00", "month": "01", "second": "09", "minuite": "00", "year": "1970", "timezone": "-00:00", "day": "02"}
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant