-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathfloodsub.ss
62 lines (54 loc) · 1.57 KB
/
floodsub.ss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
;; © vyzo
;; floodsub
(import :std/actor
:std/sugar
:std/iter
:vyzo/simsub/proto
:vyzo/simsub/env)
(export #t)
(def (floodsub _ receive initial-peers rng ready! go!)
(def messages (make-hash-table-eqv))
(def peers [])
(def (connect new-peers)
(let (new-peers (filter (lambda (peer) (not (memq peer peers)))
new-peers))
(for (peer new-peers)
(send! (!!pubsub.connect peer)))
(set! peers
(foldl cons peers new-peers))))
(def (connect-complete)
(let lp ()
(<- ((!pubsub.connect)
(unless (memq @source peers)
(set! peers (cons @source peers)))
(lp))
(else (void)))))
(def (loop)
(<- ((!pubsub.connect)
(unless (memq @source peers)
(set! peers (cons @source peers))))
((!pubsub.publish id msg)
(hash-put! messages id msg)
;; deliver
(receive id msg)
;; and forward
(for (peer (shuffle/normalize peers rng))
(send! (!!pubsub.message peer id msg))))
((!pubsub.message id msg)
(unless (hash-get messages id) ; seen?
(hash-put! messages id msg)
;; deliver
(receive id msg)
;; and forward
(for (peer (shuffle/normalize peers rng))
(unless (eq? @source peer)
(send! (!!pubsub.message peer id msg)))))))
(loop))
(try
(connect initial-peers)
(ready!)
(connect-complete)
(go!)
(loop)
(catch (e)
(errorf "unhandled exception: ~a" e))))