-
Notifications
You must be signed in to change notification settings - Fork 9
/
cmu.slide
462 lines (375 loc) · 19.1 KB
/
cmu.slide
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
Distributed Machine Learning
with Kubernetes and general-purpose cluster computing
Yi Wang
#----------------------------------------------------------------------
# Jike and Ian want me describe my experience of using Kubernetes with
# distributed machine learning. But I think that other than
# explaining how I use it, it might be more useful to explain why I
# use it. My motivation is general-purpose cluster computing.
#----------------------------------------------------------------------
* Distributed Machine Learning:
Students' Mode:
- CUDA tower/laptop
Research Lab Mode
- HPC cluster
- used by a small team
- a few kinds of jobs focus on research topics
- special-purpose: run fast!
- a real example: GPU cluster for DNN training using sync SGD
#----------------------------------------------------------------------
# Several months ago I began to learn deep learning. I started with
# programming Python on a CUDA laptop. This scale of parallel computing
# is small -- a CPU and a GPU. But it seems that many people start with
# such simple setting. We all know that is not enough for solving real
# problems.
#
# From there, I started training models using the computing cluster of
# our team. The cluster was equipped with CUDA GPU towers connected
# with high speed network. Such settings is common in research labs.
#
# To make full use of GPUs, our team members wrote kernel programs
# specifically for the family of CUDA GPUs we are using. Also, our
# team members implemented an extremely efficient MPI AllReduce
# operation, which covers details like handling communications between
# GPUs on the same node and between GPUs on different nodes.
#
# This high performance system enabled fast iteration of research.
# However, it has some limits as well.
#----------------------------------------------------------------------
* Special-Purpose Cluster Computing
- High speedup, but low parallelism
- Fast jobs, but low cluster utilization
- Fast I/O, but not scalable storage
- Fast at few kinds of jobs, but slow in general
#----------------------------------------------------------------------
#
# We highly optimized the AllReduce operation, because the system runs
# synchronous SGD algorithm, where the effective mini-batch is the sum
# of those on each GPU. If we use more than 32 GPUs, the effective
# mini-batch size would be too big and it would take too long time for
# the job to converge. This poses the limitation of parallelism.
#
# Another problem is low utilization. Though the cluster has many
# nodes, each job cannot use many of them. So we need to run many jobs
# in parallel to make full use of these nodes. But in practice, we
# don't always have so many jobs.
#
# To accelerate the training of DNN models, the systems loads data from
# local disks via high performance adaptors. This implies that we have
# to duplicate training data over all nodes. For few times when we get
# more training data, we run out of disk space. It is true that we can
# install more disks for each node, but there is anyway a limit there.
#
# Another problem was that when we preprocess new data, we ran out of
# disks due to the large amount of intermediate results. We have to
# manually split the data into small chunks and process them one-by-one.
#
# We did consider building a Hadoop cluster in addition to our GPU
# cluster, but we cannot build a new cluster every time we face a new
# kind of job -- the maintenance cost would be super high and the
# overall utilization would be super low.
#
# A even head-scratching problem is, how if we are facing a new problem
# and there isn't existing framework that fits it. MapReduce, Spark,
# Storm, GraphLab, Petuum -- none of them fits all algorithms.
#----------------------------------------------------------------------
* Industrial Challenges
- Big data as log streams
- Unpredictably many kinds of jobs
- Better utilization of the cluster saves millions of dollars
#----------------------------------------------------------------------
# Unfortunately, all aforementioned challenges become real, when a
# research team grows to do more kinds of research or to serve real
# business:
#
# - Infinite amount of data are streaming 24x365 days in the form of log
# messages generated by online Web services, and
#
# - The team have to try many algorithms/solutions to make full use of
# data and to constantly improve service quality.
#
# - There would be inevitably many kinds of jobs: online serving, online
# learning, and offline data processing, and batch learning.
#
# - To make full utilization of clusters, we need to run all these
# jobs on the same cluster, high priority ones serving products, low
# priority ones running experiments.
#
# What is the solution to such a complexity?
#----------------------------------------------------------------------
* Industrial Solutions
General-purpose cluster computing
- Distributed storage
- Cluster management systems
- Distributed computing frameworks
- Deployment
- Versions
- Job isolation
- Scalability and auto fault recovery
* Distributed Storage
Distributed filesystem
- Less data duplication than HPC
Distributed in-memory storage
- Distributed cache: memcached
- Distributed NoSQL: Bigtable, HBase, Redis, ...
#----------------------------------------------------------------------
# First of all, instead of duplicating data over all nodes, we need a
# distributed filesystem like Hadoop HDFS and Google GFS, which, for
# reliability, allow limited data duplication. For example, each chunk
# of data is duplicated on 3 or 5 nodes. Less duplication means lower
# data I/O, but it also means big storage. Indeed, for high performance
# I/O, we can use caching services (e.g., memcached) or NoSQL database
# (Bigtable and Redis), which preload data into memory.
#----------------------------------------------------------------------
* Kubernetes: Cluster Management
One cluster run all jobs
- YARN and Mesos
- DC-OS
- Kubernetes
Cluster management systems should provides API but not framework.
#----------------------------------------------------------------------
# Then we need a common software platform, the distributed operating
# system or cluster management system, to run the jobs -- machine
# learning and standard ones like memcached and Redis. Possible open
# source choices include YARN, Mesos and Kubernetes. Their designs were
# all influenced by Google Borg.
#
# Both Borg and Kubernetes, the open sourced rewrite of Borg in Go, can
# run arbitrary programs, for example, nginx and memcached. Users
# usually write a job description file to tell them informations like
# the number of preferred instances, the resource requirement of each
# instance, and the maximum trials to restart failed workers.
#
# Mesos runs only programs written to fit its framework. In order to
# run Hadoop MapReduce or nginx, we have to write C++ adaptor programs,
# which, once launched by Mesos, start Hadoop and/or nginx. Similarly,
# YARN runs only programs following some patterns.
#----------------------------------------------------------------------
* Distributed Computing Frameworks
Framework creators tend to over-advertise the generally applicability:
- Hadoop Mahout
- Spark MLlib
- Graphlab libraries
Application-driven developers creates frameworks for purposed uses:
- MapReduce
- Pregel
- SETI
Kubernetes made it easy to create frameworks.
#----------------------------------------------------------------------
# To ease the effort of writing distributed programs running on the
# cluster, it is a good idea to hide the common part that handles
# concurrency and communication in a library, and let users write only
# task specific part. These libraries are call distributed computing
# frameworks.
# The machine learning community has long been acquainted with MapReduce,
# GraphLab and Spark. To show the business value, framework developers
# tend to show that many algorithms fit in their framework. But the
# reality is that there is a balance between generally applicability and
# runtime performance.
# Google showed us a good example -- each framework for a certain type
# of jobs. MapReduce for batch index building and other batch data
# processing, Pregel for efficient PageRank computing and graph
# computing, GBR for maximum entropy model training and other
# AllReduce-operation based algorithms, SETI specifically for
# click-through rate prediction and feature learning.
# Borg and Kubernetes were designed to provide some neat APIs that helps
# engineers and researchers write their own frameworks easily and
# specifically tailored for their new algorithms, without requiring
# users knowing details shielded by the cluster management system.
#----------------------------------------------------------------------
* Deployment, Versioning, Rollout/Rollback
A straight-forward and commonly-used way:
- build the program into a binary executable file,
- tar the binary file, configuration file, and shared libraries,
- scp the tarball to all nodes, and untar,
- start the program on some nodes,
- remove the untar-ed files after the job finishes.
Kubernetes
- deploy once and run many times
- rollout new versions
- rollback to old versions
# It is inefficient to deploy and remove a program every time we run a
# job, because we might run a program multiple times. However, it is a
# hard manual labor to maintain a cache of deployed programs -- consider
# that once a new version of the program is released, we need to make
# sure that newly started jobs run the new version, without killing
# existing jobs that run the old version. This is called *rollout*. A
# similar problem is *rollback* -- if we find that the newly released
# version has a bug, we need to go back with the old version.
* Dynamic Scheduling and Elastic Scaling
An example
- an experiment job A is using all 100 GPUs
- a production job starts and need 50 GPUs
- MPI would need to kill A to free resource for B
- Is research kind of second-class work?!
Kubernetes
- Kubernetes kills 50 workers of A to free resource for B.
- After B completes, Kubernetes restart those 50 workers of A.
- If A runs asynchronous SGD instead of sync SGD, it can continue with 50 workers.
#----------------------------------------------------------------------
# Consider the case that an experiment model training job is using all
# the 100 GPUs in the cluster. And a production job get started and ask
# for 50 GPUs. If we use MPI, we'd have to kill the experiment job so
# to release enough resource to run the production job. This tend to
# make the owner of the experiment job feeling that he himself is doing
# a "second-class" job.
# Kubernetes is smarter than MPI as it can kill only 50 workers of the
# experiment job, so to allow both jobs run at the same time. But this
# also requires that we change our program -- we have to abandon the
# highly optimized AllReduce operation, because it would block if some
# workers are killed and cannot join this collaborative operation.
# Remember the reason that we use AllReduce is because we use
# synchronous SGD. But in order to write a distributed DNN training
# program, we might have to use asynchronous SGD, even if asynchronous
# SGD is arguably slower than its synchronous counterpart.
# With asynchronous SGD, workers don't talk with each other; instead,
# each worker occasionally talk with the parameter server. If some
# workers get preempted by higher priority jobs, other workers can go on
# with the training job. Once the preempted worker gets restarted, it
# can ask the parameter server for the current model and re-join the
# job. If the parameter server itself gets preempted, all workers can
# go on update their local model for a while without talking to the
# parameter server. Once the parameter server gets restarted, workers
# can resume the occasional update with the parameter server.
#----------------------------------------------------------------------
* Auto Fault Recovery and Scalable Computing
- Scalability is not just speedup. It depends on auto fault recovery!
- Preemption happens. Jobs never end without recovery
- Auto fault recovery is a work of the framework
- Frameworks need to restrict inter-worker data dependencies
Some examples:
- MPI: arbitrary inter-worker communication, no way to auto fault recovery.
- MapReduce: prohibits communications, less generally applicable.
- Pregel: find a balance between recovery and generalization
#----------------------------------------------------------------------
# Above example introduces the concept of auto fault recovery, which is
# important if we want scalability.
#
# In a general purpose cluster, there is always a chance that some
# workers get preempted by some higher priority jobs. The more the
# workers in a job and the longer the time needed by the job, the higher
# probability that some workers get preempted during the run. It is
# true that we can restart a job, but if the system is not auto
# recoverable, the restart re-runs the job from the very beginning, then
# the jobs might never complete, because every time it's restarted, it
# is likely being preempted.
#
# The key to auto fault recovery is to cut off the data dependencies
# between workers. With MPI, every worker can talk to any other worker
# at any time. Suppose that we want to restart a worker A and bring it
# back to the status right before it was preempted, we need to re-send
# to A all those messages it accepted during its previous life. That
# means we need to restart all those workers who talked to A. Such
# dependencies propagate and it is often that all workers need to be
# restarted, and the job restarts from the very beginning.
#
# The MapReduce framework is well known for its capability of auto fault
# recovery. A MapReduce job contains three phases: map, shuffle, and
# reduce. Map and reduce workers are not allowed to communicated with
# each other, thus no data dependencies in map and reduce phases, once a
# worker gets restarted, it can continue with the unfinished task or the
# next task. A limited form of data exchange happens in the shuffle
# stage, but it is hidden from the user and taken care of by the
# MapReduce framework. MapReduce is efficient at fault recovery and is
# very scalable, but it is not flexible and might not fit algorithms that
# do need inter-worker communication.
#
# Pregel, another Google framework, is in between MapReduce, an extreme
# for scalability, and MPI, the other extreme of flexibility. In
# Pregel, the computation is defined as a sequence of *super-steps*.
# Within each super-step, workers can communicate with each other. At
# the end of each super-step, the framework checkpoints all messages
# sent between workers during the super-step, so that if some workers
# get preempted, the job restarts and resume from the most recent
# checkpoint.
#----------------------------------------------------------------------
* Isolation and Quota
An example
- Job A periodically clear temporary results using `rm /tmp/*`
- Job B checkpoints to /tmp
- When A clears, B losses its checkpoints
A hard to detect bug as it rarely reappears.
Kubernetes
- run workers of jobs in Docker containers.
- each container has its own filesystem and network port space.
#----------------------------------------------------------------------
# Cutting off data dependencies is not easy. In above examples, we
# talked about eliminating network communications between workers. But
# there are other forms of data dependency. For example, via the
# filesystem:
#
# Consider that a job A writes intermediate results to the `/tmp`
# directory, and it removes these intermediate results periodically by
# calling `rm /tmp/*`. Another job B writes checkpoints to `/tmp`. A
# might unintentionally remove checkpoints of B. And it is very hard to
# detect such kind of bugs, since the error rarely reappear.
#
# With Kubernetes, people have to build their programs into Docker
# *images* that run as Docker *containers*. Each container has its own
# filesystem and network port space. When A runs as a container, it
# removes only files in its own `/tmp` directory. This is to some
# extent like that we define C++ classes in namespaces, which helps us
# removing class name conflicts.
#----------------------------------------------------------------------
* Kubernetes Cluster
- Linux kernel >3.10 and Docker container
- distributed filesystem: HDFS or Ceph
- Kubernetes
- many frameworks for many kinds of jobs
- create new frameworks for new algorithms
Remember to change models or learning algorithms so to control data
dependencies, and ease the development of frameworks.
The community is making Kubernetes job schedulers take GPUs under
consideration.
#----------------------------------------------------------------------
# If we use Kubernetes, the general picture of computing is illustrated
# as follows:
#
# - The cluster has a distributed filesystem like HDFS, which logically
# organizes all disk space into a single big storage.
#
# - We can run in-memory storage systems to preload the data from HDFS
# if we want high performance I/O.
#
# - Kubernetes manages hardware resources, and monitors workers/jobs. A
# job is described by the number of workers and the amount of resource
# needed for each worker. Kubernetes starts workers on nodes that
# have enough idle resource.
#
# - Kubernetes can run arbitrary programs, including those programs
# depending on frameworks. Kubernetes APIs make it easy for
# researchers to write their own framework if they want to.
#
# - The design of algorithms and its parallel implementation is highly
# correlated. Recall the sync and asynchronous SGD example. So it is likely
# that a valuable algorithm needs specially tailored parallel
# computing framework.
#
# - It is often that we need to change the algorithm so to control the
# data dependencies between workers. This would make it easier for us
# to design a fault recoverable and scalable framework for this
# algorithm.
#----------------------------------------------------------------------
* An Example
The tech stack
Application : ASR server Kafka Log joiner model trainer
Framework : nginx Storm Torch
Cluster Mgmt : Kubernetes
Distributed FS : Ceph
Locking service: etcd
Local OS : CoreOS
The data flow
[mobile apps] <----> [ASR server] --log--> [Kafka]
↑ |
| utterances & user-
| satisfied transcriptions
| ↓
Ceph <-model-- [model trainer]
# LocalWords: Jike Caffe CUDA GPUs Nvidia cuBLAS cuDNN MPI SGD
# LocalWords: AllReduce DNN Hadoop HDFS flume MapReduce GFS GBR
# LocalWords: memcached NoSQL Bigtable Redis Pregel PageRank scp
# LocalWords: SETI Mesos nginx APIs Versioning Rollout untar tmp
# LocalWords: versioning rollout DistBelief MPICH BorgMPI PLSA
# LocalWords: rm cgroup API Github Kubernetes Yi Apr Tensorflow
# LocalWords: preprocess GraphLab Petuum tarball CI HPC HBase DC
# LocalWords: Mahout MLlib Graphlab Ceph ASR Mgmt FS Ubuntu apps