-
Notifications
You must be signed in to change notification settings - Fork 0
/
spark-streaming.py
71 lines (54 loc) · 2.66 KB
/
spark-streaming.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from pyspark import SparkContext, SparkConf
from pyspark.streaming import StreamingContext
import numpy as np
import sys
from io import BytesIO
from StringIO import StringIO
from tensorflowonspark import TFCluster
import argparse
import inference_Sony_our
from datetime import datetime
conf = SparkConf()
conf.setMaster('yarn-client')
conf.set('spark.yarn.dist.files', 'file:/usr/local/lib/python2.7/dist-packages/pyspark/python/lib/pyspark.zip,file:/usr/local/lib/python2.7/dist-packages/pyspark/python/lib/py4j-0.10.7-src.zip')
conf.setExecutorEnv('PYTHONPATH', 'pyspark.zip:py4j-0.10.7-src.zip')
conf.setAppName('spark-streaming')
conf.set("spark.dynamicAllocation.enabled", "false")
# Create a local StreamingContext with two working thread and batch interval of 1 second
sc = SparkContext(conf=conf)
#sc.setLogLevel("FATAL")
executors = sc.getConf().get("spark.executor.instances")
num_executors = int(executors) if executors is not None else 1
# arguments for Spark and TFoS
parser = argparse.ArgumentParser()
parser.add_argument("--cluster_size", help="number of nodes in the cluster", type=int, default=num_executors)
parser.add_argument("--num_ps", help="number of ps nodes", type=int, default=1)
parser.add_argument("--tensorboard", help="launch tensorboard process", default=False)
parser.add_argument("--mode", help="train|inference", default="inference")
parser.add_argument("--epochs", help="number of epochs", type=int, default=1)
parser.add_argument("--steps", help="maximum number of steps", type=int, default=1)
parser.add_argument("--model", help="HDFS path to save/load model during train/inference",
default='hdfs://gpu10:9000/Sony_model/')
parser.add_argument("--output", help="HDFS path to save output file",
default='hdfs://gpu10:9000/Sony_output/')
args = parser.parse_args()
print("args:", args)
print("{0} ===== Start".format(datetime.now().isoformat()))
ssc = StreamingContext(sc, 30)
hdfs_path = 'hdfs://gpu10:9000/textnumpy/'
rawtextRDD = ssc.textFileStream(hdfs_path)
def string2numpy(input):
output = StringIO(input.encode('latin1'))
output = np.loadtxt(output, dtype=np.float32)
return np.array(output.reshape(512,512,4))
#parse the string rdd to numpy rdd
imageRDD = rawtextRDD.map(lambda content: string2numpy(content))
imageRDD.pprint()
cluster = TFCluster.run(sc, inference_Sony_our.main_fun, args, args.cluster_size, args.num_ps, args.tensorboard,TFCluster.InputMode.SPARK)
labelRDD = cluster.inference(imageRDD)
labelRDD.pprint()
labelRDD.saveAsTextFiles(args.output)
cluster.shutdown()
ssc.start() # Start the computation
ssc.awaitTermination() # Wait for the computation to terminate
print('stopped')