-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathindex.html
1620 lines (1352 loc) · 46.1 KB
/
index.html
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
<!DOCTYPE HTML>
<html>
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<script src="jquery.js"></script>
<script src="highlight.min.js"></script>
<script src="nav.js"></script>
<!-- Code Monospace Font -->
<link href='http://fonts.googleapis.com/css?family=Inconsolata' rel='stylesheet'>
<link rel="stylesheet" href="base.css">
<link rel="stylesheet" href="skeleton.css">
<link rel="stylesheet" href="layout.css">
<link rel="stylesheet" href="gevent.css">
<!-- Syntax Highlighting Theme -->
<link rel="stylesheet" href="github.min.css">
<title>Gevent指南</title>
</head>
<style>
</style>
<body>
<div class="container">
<div id="sidebar" class="three columns sidebar">
<nav>
</nav>
</div>
<div class="twelve columns offset-by-three content">
<header>
<h1><span class="green">gevent</span>程序员指南</h1>
<h3 class="author">
由Gevent社区编写
</h3>
<blockquote>
gevent是一个基于<a href="http://software.schmorp.de/pkg/libev.html">libev</a>的并发库。它为各种并发和网络相关的任务提供了整洁的API。
</blockquote>
</header>
<div class="toc">
<ul>
<li><a href="#">介绍</a><ul>
<li><a href="#_1">贡献者</a></li>
</ul>
</li>
<li><a href="#_2">核心部分</a><ul>
<li><a href="#greenlets">Greenlets</a></li>
<li><a href="#_3">同步和异步执行</a></li>
<li><a href="#_4">确定性</a></li>
<li><a href="#greenlets_1">创建Greenlets</a></li>
<li><a href="#greenlet">Greenlet状态</a></li>
<li><a href="#_5">程序停止</a></li>
<li><a href="#_6">超时</a></li>
<li><a href="#monkey-patching">猴子补丁(Monkey patching)</a></li>
</ul>
</li>
<li><a href="#_7">数据结构</a><ul>
<li><a href="#_8">事件</a></li>
<li><a href="#_9">队列</a></li>
<li><a href="#_10">组和池</a></li>
<li><a href="#_11">锁和信号量</a></li>
<li><a href="#_12">线程局部变量</a></li>
<li><a href="#_13">子进程</a></li>
<li><a href="#actors">Actors</a></li>
</ul>
</li>
<li><a href="#_14">真实世界的应用</a><ul>
<li><a href="#gevent-zeromq">Gevent ZeroMQ</a></li>
<li><a href="#server">简单server</a></li>
<li><a href="#wsgi-servers">WSGI Servers</a></li>
<li><a href="#server_1">流式server</a></li>
<li><a href="#long-polling">Long Polling</a></li>
<li><a href="#websockets">Websockets</a></li>
<li><a href="#server_2">聊天server</a></li>
</ul>
</li>
</ul>
</div>
<h1 id="">介绍</h1>
<p>本指南假定读者有中级Python水平,但不要求有其它更多的知识,不期待读者有
并发方面的知识。本指南的目标在于给予你需要的工具来开始使用gevent,帮助你
驯服现有的并发问题,并从今开始编写异步应用程序。</p>
<h3 id="_1">贡献者</h3>
<p>按提供贡献的时间先后顺序列出如下:
<a href="http://www.stephendiehl.com">Stephen Diehl</a>
<a href="https://github.com/jerem">Jérémy Bethmont</a>
<a href="https://github.com/sww">sww</a>
<a href="https://github.com/brunoqc">Bruno Bigras</a>
<a href="https://github.com/dripton">David Ripton</a>
<a href="https://github.com/traviscline">Travis Cline</a>
<a href="https://github.com/Lothiraldan">Boris Feld</a>
<a href="https://github.com/youngsterxyf">youngsterxyf</a>
<a href="https://github.com/ehebert">Eddie Hebert</a>
<a href="http://notmyidea.org">Alexis Metaireau</a>
<a href="https://github.com/djv">Daniel Velkov</a></p>
<p>同时感谢Denis Bilenko写了gevent和相应的指导以形成本指南。</p>
<p>这是一个以MIT许可证发布的协作文档。你想添加一些内容?或看见一个排版错误?
Fork一个分支发布一个request到
<a href="https://github.com/sdiehl/gevent-tutorial">Github</a>.
我们欢迎任何贡献。</p>
<p>本文原文地址为:<a href="http://xlambda.com/gevent-tutorial/">http://xlambda.com/gevent-tutorial/</a>, 由<a href="http://jerrywin.com">jerrypy</a> 稍作修改和添加注释。本页也有<a href="http://methane.github.com/gevent-tutorial-ja">日文版本</a>。</p>
<h1 id="_2">核心部分</h1>
<h2 id="greenlets">Greenlets</h2>
<p>在gevent中用到的主要模式是<strong>Greenlet</strong>,
它是以C扩展模块形式接入Python的轻量级协程。
Greenlet全部运行在主程序操作系统进程的内部,但它们被协作式地调度。</p>
<blockquote>
<p>在任何时刻,只有一个协程在运行。</p>
</blockquote>
<p>这与<code>multiprocessing</code>或<code>threading</code>等提供真正并行构造的库是不同的。
这些库轮转使用操作系统调度的进程和线程,是真正的并行。</p>
<p class="comment"><strong>注释</strong>:TODO 这里解释一下关键的区别。</p>
<h2 id="_3">同步和异步执行</h2>
<p>并发的核心思想在于,大的任务可以分解成一系列的子任务,后者可以被调度成
同时执行或<em>异步</em>执行,而不是一次一个地或者<em>同步</em>地执行。两个子任务之间的
切换也就是<em>上下文切换</em>。</p>
<p>在gevent里面,上下文切换是通过<em>yielding</em>来完成的. 在下面的例子里,
我们有两个上下文,通过调用<code>gevent.sleep(0)</code>,它们各自yield向对方。</p>
<pre><code class="python">
import gevent
def foo():
print('Running in foo')
gevent.sleep(0)
print('Explicit context switch to foo again')
def bar():
print('Explicit context to bar')
gevent.sleep(0)
print('Implicit context switch back to bar')
gevent.joinall([
gevent.spawn(foo),
gevent.spawn(bar),
])
</pre>
<p></code>
<pre><code class="python">
Running in foo
Explicit context to bar
Explicit context switch to foo again
Implicit context switch back to bar
</pre></code></p>
<p>下图将控制流形象化,就像在调试器中单步执行整个程序,以说明上下文切换如何发生。</p>
<p><img alt="Greenlet Control Flow" src="flow.gif" /></p>
<p class="comment"><strong>注释</strong>:在当前版本gevent的实现中,<code>sleep(0)</code>(默认值)意味着把执行权让给其它正在运行的greenlet。具体见官方文档中<a href="http://www.gevent.org/gevent.html#sleeping">sleep</a>的用法。</p>
<p>当我们在受限于网络或IO的函数中使用gevent,这些函数会被协作式的调度,
gevent的真正能力会得到发挥。Gevent处理了所有的细节,
来保证你的网络库会在可能的时候,隐式交出greenlet上下文的执行权。
这样的一种用法是如何强大,怎么强调都不为过。或者我们举些例子来详述。</p>
<p>下面例子中的<code>select()</code>函数通常是一个在各种文件描述符上轮询的阻塞调用。</p>
<pre><code class="python">
import time
import gevent
from gevent import select
start = time.time()
tic = lambda: 'at %1.1f seconds' % (time.time() - start)
def gr1():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr2():
# Busy waits for a second, but we don't want to stick around...
print('Started Polling: %s' % tic())
select.select([], [], [], 2)
print('Ended Polling: %s' % tic())
def gr3():
print("Hey lets do some stuff while the greenlets poll, %s" % tic())
gevent.sleep(1)
gevent.joinall([
gevent.spawn(gr1),
gevent.spawn(gr2),
gevent.spawn(gr3),
])
</pre>
<p></code>
<pre><code class="python">
Started Polling: at 0.0 seconds
Started Polling: at 0.0 seconds
Hey lets do some stuff while the greenlets poll, at 0.0 seconds
Ended Polling: at 2.0 seconds
Ended Polling: at 2.0 seconds
</pre></code></p>
<p>下面是另外一个多少有点人造色彩的例子,定义一个<em>非确定性的(non-deterministic)</em>
的<code>task</code>函数(给定相同输入的情况下,它的输出不保证相同)。
此例中执行这个函数的副作用就是,每次task在它的执行过程中都会随机地停某些秒。</p>
<pre><code class="python">
import gevent
import random
def task(pid):
"""
Some non-deterministic task
"""
gevent.sleep(random.randint(0,2)*0.001)
print('Task %s done' % pid)
def synchronous():
for i in range(1,10):
task(i)
def asynchronous():
threads = [gevent.spawn(task, i) for i in xrange(10)]
gevent.joinall(threads)
print('Synchronous:')
synchronous()
print('Asynchronous:')
asynchronous()
</pre>
<p></code>
<pre><code class="python">
Synchronous:
Task 1 done
Task 2 done
Task 3 done
Task 4 done
Task 5 done
Task 6 done
Task 7 done
Task 8 done
Task 9 done
Asynchronous:
Task 3 done
Task 7 done
Task 9 done
Task 2 done
Task 4 done
Task 1 done
Task 8 done
Task 6 done
Task 0 done
Task 5 done
</pre></code></p>
<p>上例中,在同步的部分,所有的task都同步的执行,
结果当每个task在执行时主流程被<em>阻塞</em>(主流程的执行暂时停住)。</p>
<p>程序的重要部分是将task函数封装到Greenlet内部线程的<code>gevent.spawn</code>。
初始化的greenlet列表存放在数组<code>threads</code>中,此数组被传给<code>gevent.joinall</code>
函数,后者阻塞当前流程,并执行所有给定的greenlet。执行流程只会在
所有greenlet执行完后才会继续向下走。</p>
<p>要重点留意的是,异步的部分本质上是随机的,而且异步部分的整体运行时间比同步
要大大减少。事实上,同步部分的最大运行时间,即是每个task停0.002秒,结果整个
队列要停0.02秒。而异步部分的最大运行时间大致为0.002秒,因为没有任何一个task会
阻塞其它task的执行。</p>
<p>一个更常见的应用场景,如异步地向服务器取数据,取数据操作的执行时间
依赖于发起取数据请求时远端服务器的负载,各个请求的执行时间会有差别。</p>
<pre><code class="python">import gevent.monkey
gevent.monkey.patch_socket()
import gevent
import urllib2
import simplejson as json
def fetch(pid):
response = urllib2.urlopen('http://ossepy.sinaapp.com/time.json')
# 原文链接(http://json-time.appspot.com/time.json)需翻墙,这里在SAE上自己搭了个
result = response.read()
json_result = json.loads(result)
datetime = json_result['datetime']
print('Process %s: %s' % (pid, datetime))
return json_result['datetime']
def synchronous():
for i in range(1,10):
fetch(i)
def asynchronous():
threads = []
for i in range(1,10):
threads.append(gevent.spawn(fetch, i))
gevent.joinall(threads)
print('...Synchronous...:')
synchronous()
print('...Asynchronous...:')
asynchronous()
</code>
</pre>
<h2 id="_4">确定性</h2>
<p>就像之前所提到的,greenlet具有确定性。在相同配置相同输入的情况下,它们总是
会产生相同的输出。下面就有例子,我们在multiprocessing的pool之间执行一系列的
任务,与在gevent的pool之间执行作比较。</p>
<pre>
<code class="python">
import time
def echo(i):
time.sleep(0.001)
return i
# Non Deterministic Process Pool
from multiprocessing.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
# Deterministic Gevent Pool
from gevent.pool import Pool
p = Pool(10)
run1 = [a for a in p.imap_unordered(echo, xrange(10))]
run2 = [a for a in p.imap_unordered(echo, xrange(10))]
run3 = [a for a in p.imap_unordered(echo, xrange(10))]
run4 = [a for a in p.imap_unordered(echo, xrange(10))]
print(run1 == run2 == run3 == run4)
</code>
</pre>
<pre>
<code class="python">False
True</code>
</pre>
<p>即使gevent通常带有确定性,当开始与如socket或文件等外部服务交互时,
不确定性也可能溜进你的程序中。因此尽管gevent线程是一种“确定的并发”形式,
使用它仍然可能会遇到像使用POSIX线程或进程时遇到的那些问题。</p>
<p>涉及并发长期存在的问题就是<em>竞争条件(race condition)</em>。简单来说,
当两个并发线程/进程都依赖于某个共享资源同时都尝试去修改它的时候,
就会出现竞争条件。这会导致资源修改的结果状态依赖于时间和执行顺序。
这是个问题,我们一般会做很多努力尝试避免竞争条件,
因为它会导致整个程序行为变得不确定。</p>
<p>最好的办法是始终避免所有全局的状态。全局状态和导入时(import-time)副作用总是会
反咬你一口!</p>
<p class="comment"><strong>注释</strong>:TODO 这里解释一下imap, map还有确定性的原因。</p>
<h2 id="greenlets_1">创建Greenlets</h2>
<p>gevent对Greenlet初始化提供了一些封装,最常用的使用模板之一有</p>
<pre><code class="python">
import gevent
from gevent import Greenlet
def foo(message, n):
"""
Each thread will be passed the message, and n arguments
in its initialization.
"""
gevent.sleep(n)
print(message)
# Initialize a new Greenlet instance running the named function
# foo
thread1 = Greenlet.spawn(foo, "Hello", 1)
# Wrapper for creating and running a new Greenlet from the named
# function foo, with the passed arguments
thread2 = gevent.spawn(foo, "I live!", 2)
# Lambda expressions
thread3 = gevent.spawn(lambda x: (x+1), 2)
threads = [thread1, thread2, thread3]
# Block until all threads complete.
gevent.joinall(threads)
</pre>
<p></code>
<pre><code class="python">
Hello
I live!
</pre></code></p>
<p>除使用基本的Greenlet类之外,你也可以子类化Greenlet类,重载它的<code>_run</code>方法。</p>
<pre><code class="python">
import gevent
from gevent import Greenlet
class MyGreenlet(Greenlet):
def __init__(self, message, n):
Greenlet.__init__(self)
self.message = message
self.n = n
def _run(self):
print(self.message)
gevent.sleep(self.n)
g = MyGreenlet("Hi there!", 3)
g.start()
g.join()
</pre>
<p></code>
<pre><code class="python">
Hi there!
</pre></code></p>
<h2 id="greenlet">Greenlet状态</h2>
<p>就像任何其他成段代码,Greenlet也可能以不同的方式运行失败。
Greenlet可能未能成功抛出异常,不能停止运行,或消耗了太多的系统资源。</p>
<p>一个greenlet的状态通常是一个依赖于时间的参数。在greenlet中有一些标志,
让你可以监视它的线程内部状态:</p>
<ul>
<li><code>started</code> -- Boolean, 指示此Greenlet是否已经启动</li>
<li><code>ready()</code> -- Boolean, 指示此Greenlet是否已经停止</li>
<li><code>successful()</code> -- Boolean, 指示此Greenlet是否已经停止而且没抛异常</li>
<li><code>value</code> -- 任意值, 此Greenlet代码返回的值</li>
<li><code>exception</code> -- 异常, 此Greenlet内抛出的未捕获异常</li>
</ul>
<pre><code class="python">
import gevent
def win():
return 'You win!'
def fail():
raise Exception('You fail at failing.')
winner = gevent.spawn(win)
loser = gevent.spawn(fail)
print(winner.started) # True
print(loser.started) # True
# Exceptions raised in the Greenlet, stay inside the Greenlet.
try:
gevent.joinall([winner, loser])
except Exception as e:
print('This will never be reached')
print(winner.value) # 'You win!'
print(loser.value) # None
print(winner.ready()) # True
print(loser.ready()) # True
print(winner.successful()) # True
print(loser.successful()) # False
# The exception raised in fail, will not propogate outside the
# greenlet. A stack trace will be printed to stdout but it
# will not unwind the stack of the parent.
# fail函数中的异常不会影响(propogate)主函数。
# 异常报错的堆栈信息仍然会输出到标准输出中,
# 但是在主函数中的try-except并不能捕捉到greenlet中的异常。
print(loser.exception)
# It is possible though to raise the exception again outside
# raise loser.exception
# or with
# loser.get()
</pre>
<p></code>
<pre><code class="python">
True
True
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
result = self._run(*self.args, **self.kwargs)
File "ex.py", line 7, in fail
raise Exception('You fail at failing.')
Exception: You fail at failing.
<Greenlet at 0x7f56a0db67d0: fail> failed with Exception
You win!
None
True
True
True
False
You fail at failing.
</pre></code></p>
<h2 id="_5">程序停止</h2>
<p>当主程序(main program)收到一个SIGQUIT信号时,不能成功做yield操作的
Greenlet可能会令意外地挂起程序的执行。这导致了所谓的僵尸进程,
它需要在Python解释器之外被kill掉。</p>
<p>对此,一个通用的处理模式就是在主程序中监听SIGQUIT信号,在程序退出
调用<code>gevent.shutdown</code>。</p>
<pre>
<code class="python">import gevent
import signal
def run_forever():
gevent.sleep(1000)
if __name__ == '__main__':
gevent.signal(signal.SIGQUIT, gevent.kill)
thread = gevent.spawn(run_forever)
thread.join()
</code>
</pre>
<p class="comment"><strong>注释</strong>:TODO signal用法,这段代码的意义。</p>
<h2 id="_6">超时</h2>
<p>超时是一种对一块代码或一个Greenlet的运行时间的约束。</p>
<pre>
<code class="python">
import gevent
from gevent import Timeout
seconds = 10
timeout = Timeout(seconds)
timeout.start()
def wait():
gevent.sleep(10)
try:
gevent.spawn(wait).join()
except Timeout:
print('Could not complete')
</code>
</pre>
<p>超时类也可以用在上下文管理器(context manager)中, 也就是with语句内。</p>
<pre>
<code class="python">import gevent
from gevent import Timeout
time_to_wait = 5 # seconds
class TooLong(Exception):
pass
with Timeout(time_to_wait, TooLong):
gevent.sleep(10)
</code>
</pre>
<p>另外,对各种Greenlet和数据结构相关的调用,gevent也提供了超时参数。
例如:</p>
<pre><code class="python">
import gevent
from gevent import Timeout
def wait():
gevent.sleep(2)
timer = Timeout(1).start()
thread1 = gevent.spawn(wait)
try:
thread1.join(timeout=timer)
except Timeout:
print('Thread 1 timed out')
# --
timer = Timeout.start_new(1)
thread2 = gevent.spawn(wait)
try:
thread2.get(timeout=timer)
except Timeout:
print('Thread 2 timed out')
# --
try:
gevent.with_timeout(1, wait)
except Timeout:
print('Thread 3 timed out')
</pre>
<p></code>
<pre><code class="python">
Thread 1 timed out
Thread 2 timed out
Thread 3 timed out
</pre></code></p>
<h2 id="monkey-patching">猴子补丁(Monkey patching)</h2>
<p>我们现在来到gevent的死角了. 在此之前,我已经避免提到猴子补丁(monkey patching)
以尝试使gevent这个强大的协程模型变得生动有趣,但现在到了讨论猴子补丁的黑魔法的时候了。你之前可能注意到我们提到了<code>monkey.patch_socket()</code>这个命令,这个
纯粹副作用命令是用来改变标准socket库的。</p>
<pre>
<code class="python">import socket
print(socket.socket)
print("After monkey patch")
from gevent import monkey
monkey.patch_socket()
print(socket.socket)
import select
print(select.select)
monkey.patch_select()
print("After monkey patch")
print(select.select)
</code>
</pre>
<pre>
<code class="python">class 'socket.socket'
<class 'socket._socketobject'>
After monkey patch
<class 'gevent.socket.socket'>
<built-in function select>
After monkey patch
<function select at 0x7f2c5a816b18>
</code>
</pre>
<p>Python的运行环境允许我们在运行时修改大部分的对象,包括模块,类甚至函数。
这是个一般说来令人惊奇的坏主意,因为它创造了“隐式的副作用”,如果出现问题
它很多时候是极难调试的。虽然如此,在极端情况下当一个库需要修改Python本身
的基础行为的时候,猴子补丁就派上用场了。在这种情况下,gevent能够
修改标准库里面大部分的阻塞式系统调用,包括<code>socket</code>、<code>ssl</code>、<code>threading</code>和
<code>select</code>等模块,而变为协作式运行。</p>
<p>例如,Redis的python绑定一般使用常规的tcp socket来与<code>redis-server</code>实例通信。
通过简单地调用<code>gevent.monkey.patch_all()</code>,可以使得redis的绑定协作式的调度
请求,与gevent栈的其它部分一起工作。</p>
<p>这让我们可以将一般不能与gevent共同工作的库结合起来,而不用写哪怕一行代码。
虽然猴子补丁仍然是邪恶的(evil),但在这种情况下它是“有用的邪恶(useful evil)”。</p>
<h1 id="_7">数据结构</h1>
<h2 id="_8">事件</h2>
<p>事件(event)是一个在Greenlet之间异步通信的形式。</p>
<pre>
<code class="python">import gevent
from gevent.event import Event
'''
Illustrates the use of events
'''
evt = Event()
def setter():
'''After 3 seconds, wake all threads waiting on the value of evt'''
print('A: Hey wait for me, I have to do something')
gevent.sleep(3)
print("Ok, I'm done")
evt.set()
def waiter():
'''After 3 seconds the get call will unblock'''
print("I'll wait for you")
evt.wait() # blocking
print("It's about time")
def main():
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter),
gevent.spawn(waiter)
])
if __name__ == '__main__': main()
</code>
</pre>
<p>事件对象的一个扩展是AsyncResult,它允许你在唤醒调用上附加一个值。
它有时也被称作是future或defered,因为它持有一个指向将来任意时间可设置
为任何值的引用。</p>
<pre>
<code class="python">import gevent
from gevent.event import AsyncResult
a = AsyncResult()
def setter():
"""
After 3 seconds set the result of a.
"""
gevent.sleep(3)
a.set('Hello!')
def waiter():
"""
After 3 seconds the get call will unblock after the setter
puts a value into the AsyncResult.
"""
print(a.get())
gevent.joinall([
gevent.spawn(setter),
gevent.spawn(waiter),
])
</code>
</pre>
<h2 id="_9">队列(Queue)</h2>
<p>队列是一个排序的数据集合,它有常见的<code>put</code> / <code>get</code>操作,
但是它是以在Greenlet之间可以安全操作的方式来实现的。</p>
<p>举例来说,如果一个Greenlet从队列中取出一项,此项就不会被
同时执行的其它Greenlet再取到了。</p>
<pre><code class="python">
import gevent
from gevent.queue import Queue
tasks = Queue()
def worker(n):
while not tasks.empty():
task = tasks.get()
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
print('Quitting time!')
def boss():
for i in xrange(1,25):
tasks.put_nowait(i)
gevent.spawn(boss).join()
gevent.joinall([
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'nancy'),
])
</pre>
<p></code>
<pre><code class="python">
Worker steve got task 1
Worker john got task 2
Worker nancy got task 3
Worker steve got task 4
Worker nancy got task 5
Worker john got task 6
Worker steve got task 7
Worker john got task 8
Worker nancy got task 9
Worker steve got task 10
Worker nancy got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker nancy got task 15
Worker steve got task 16
Worker nancy got task 17
Worker john got task 18
Worker steve got task 19
Worker john got task 20
Worker nancy got task 21
Worker steve got task 22
Worker nancy got task 23
Worker john got task 24
Quitting time!
Quitting time!
Quitting time!
</pre></code></p>
<p>如果需要,队列也可以阻塞在<code>put</code>或<code>get</code>操作上。</p>
<p><code>put</code>和<code>get</code>操作都有非阻塞的版本,<code>put_nowait</code>和<code>get_nowait</code>不会阻塞,
然而在操作不能完成时抛出<code>gevent.queue.Empty</code>或<code>gevent.queue.Full</code>异常。</p>
<p>在下面例子中,我们让boss与多个worker同时运行,并限制了queue不能放入多于3个元素。
这个限制意味着,直到queue有空余空间之间,<code>put</code>操作会被阻塞。相反地,如果队列中
没有元素,<code>get</code>操作会被阻塞。它同时带一个timeout参数,允许在超时时间内如果
队列没有元素无法完成操作就抛出<code>gevent.queue.Empty</code>异常。</p>
<pre><code class="python">
import gevent
from gevent.queue import Queue, Empty
tasks = Queue(maxsize=3)
def worker(n):
try:
while True:
task = tasks.get(timeout=1) # decrements queue size by 1
print('Worker %s got task %s' % (n, task))
gevent.sleep(0)
except Empty:
print('Quitting time!')
def boss():
"""
Boss will wait to hand out work until a individual worker is
free since the maxsize of the task queue is 3.
"""
for i in xrange(1,10):
tasks.put(i)
print('Assigned all work in iteration 1')
for i in xrange(10,20):
tasks.put(i)
print('Assigned all work in iteration 2')
gevent.joinall([
gevent.spawn(boss),
gevent.spawn(worker, 'steve'),
gevent.spawn(worker, 'john'),
gevent.spawn(worker, 'bob'),
])
</pre>
<p></code>
<pre><code class="python">
Worker steve got task 1
Worker john got task 2
Worker bob got task 3
Worker steve got task 4
Worker bob got task 5
Worker john got task 6
Assigned all work in iteration 1
Worker steve got task 7
Worker john got task 8
Worker bob got task 9
Worker steve got task 10
Worker bob got task 11
Worker john got task 12
Worker steve got task 13
Worker john got task 14
Worker bob got task 15
Worker steve got task 16
Worker bob got task 17
Worker john got task 18
Assigned all work in iteration 2
Worker steve got task 19
Quitting time!
Quitting time!
Quitting time!
</pre></code></p>
<h2 id="_10">组(Group)和池(Pool)</h2>
<p>组(group)是一个运行中greenlet的集合,集合中的greenlet像一个组一样
会被共同管理和调度。 它也兼饰了像Python的<code>multiprocessing</code>库那样的
平行调度器的角色。</p>
<pre><code class="python">
import gevent
from gevent.pool import Group
def talk(msg):
for i in xrange(3):
print(msg)
g1 = gevent.spawn(talk, 'bar')
g2 = gevent.spawn(talk, 'foo')
g3 = gevent.spawn(talk, 'fizz')
group = Group()
group.add(g1)
group.add(g2)
group.join()
group.add(g3)
group.join()
</pre>
<p></code>
<pre><code class="python">
bar
bar
bar
foo
foo
foo
fizz
fizz
fizz
</pre></code></p>
<p>在管理异步任务的分组上它是非常有用的。</p>
<p>就像上面所说,<code>Group</code>也以不同的方式为分组greenlet/分发工作和收集它们的结果也提供了API。</p>
<pre><code class="python">
import gevent
from gevent import getcurrent
from gevent.pool import Group
group = Group()
def hello_from(n):
print('Size of group %s' % len(group))
print('Hello from Greenlet %s' % id(getcurrent()))
group.map(hello_from, xrange(3))
def intensive(n):
gevent.sleep(3 - n)
return 'task', n
print('Ordered')
ogroup = Group()
for i in ogroup.imap(intensive, xrange(3)):
print(i)
print('Unordered')
igroup = Group()
for i in igroup.imap_unordered(intensive, xrange(3)):
print(i)
</pre>
<p></code>
<pre><code class="python">
Size of group 3
Hello from Greenlet 31048720
Size of group 3
Hello from Greenlet 31049200
Size of group 3
Hello from Greenlet 31049040
Ordered
('task', 0)
('task', 1)
('task', 2)
Unordered
('task', 2)
('task', 1)
('task', 0)
</pre></code></p>
<p>池(pool)是一个为处理数量变化并且需要限制并发的greenlet而设计的结构。
在需要并行地做很多受限于网络和IO的任务时常常需要用到它。</p>
<pre><code class="python">
import gevent
from gevent.pool import Pool
pool = Pool(2)
def hello_from(n):
print('Size of pool %s' % len(pool))
pool.map(hello_from, xrange(3))
</pre>
<p></code>
<pre><code class="python">
Size of pool 2
Size of pool 2
Size of pool 1
</pre></code></p>
<p>当构造gevent驱动的服务时,经常会将围绕一个池结构的整个服务作为中心。
一个例子就是在各个socket上轮询的类。</p>
<pre>
<code class="python">from gevent.pool import Pool
class SocketPool(object):
def __init__(self):
self.pool = Pool(1000)
self.pool.start()
def listen(self, socket):
while True:
socket.recv()
def add_handler(self, socket):
if self.pool.full():
raise Exception("At maximum pool size")
else:
self.pool.spawn(self.listen, socket)
def shutdown(self):
self.pool.kill()
</code>
</pre>
<h2 id="_11">锁和信号量</h2>
<p>信号量是一个允许greenlet相互合作,限制并发访问或运行的低层次的同步原语。
信号量有两个方法,<code>acquire</code>和<code>release</code>。在信号量是否已经被
acquire或release,和拥有资源的数量之间不同,被称为此信号量的范围
(the bound of the semaphore)。如果一个信号量的范围已经降低到0,它会
阻塞acquire操作直到另一个已经获得信号量的greenlet作出释放。</p>
<pre><code class="python">
from gevent import sleep
from gevent.pool import Pool
from gevent.coros import BoundedSemaphore
sem = BoundedSemaphore(2)
def worker1(n):
sem.acquire()
print('Worker %i acquired semaphore' % n)
sleep(0)
sem.release()
print('Worker %i released semaphore' % n)
def worker2(n):
with sem:
print('Worker %i acquired semaphore' % n)
sleep(0)
print('Worker %i released semaphore' % n)
pool = Pool()
pool.map(worker1, xrange(0,2))
pool.map(worker2, xrange(3,6))
</pre>