-
Notifications
You must be signed in to change notification settings - Fork 6
/
watch.ss
157 lines (140 loc) · 7.46 KB
/
watch.ss
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
;; General-purpose Ethereum Blockchain Watcher
(export #t)
(import
:gerbil/gambit
:std/misc/number :std/net/json-rpc :std/sugar :std/srfi/1
:clan/concurrency :clan/exception :clan/failure :clan/option :clan/timestamp
:clan/poo/object :clan/poo/brace
:clan/persist/persist
./types ./ethereum ./network-config ./json-rpc)
;; TODO: Handle "query returned more than 1000 results" when too many contracts created in interval!!!
;; TODO: Support watching multiple Ethereum-like networks in one image
;; Use "paging_options" https://explorer.energyweb.org/eth-rpc-api-docs -- available on geth???
;; Wait until at least target-block has been confirmed,
;; NB: due to polling intervals, current-block might be after our target-block
;; : BlockNumber <- BlockNumber
(def (wait-until-block target-block)
(let loop ()
(def current-block (eth_blockNumber))
(unless (>= current-block target-block)
(thread-sleep! (ethereum-block-polling-period-in-seconds))
(loop))
current-block))
;; Wait until the unix-timestamp for a block is greater or equal to given target.
;; : BlockNumber <- UnixTimestamp
(def (wait-until-block-unix-timestamp target-unix-timestamp)
(let loop ()
(def current-block (eth_blockNumber))
(def polling-period (ethereum-block-polling-period-in-seconds))
(unless (>= target-unix-timestamp
(.@ (eth_getBlockByNumber current-block #f) timestamp))
(thread-sleep! (max polling-period
(- target-unix-timestamp
(current-unix-timestamp) polling-period 1)))
(loop))))
;; Have function f process in chronological order all log entries from contract at contract-address
;; from from-block to to-block (included), starting at the (next-event)th event in from-block.
;; If some blocks are in the future, wait until they happen to return.
;; Only process a block after a sufficient number of confirmations have passed.
;; Function f may throw and/or use continuations to cause an early exit.
;; https://infura.io/docs/ethereum/json-rpc/eth-getLogs
;; : <- (Fun <- LogObject) Address BlockNumber BlockNumber UInt confirmations: ?UInt
(def (watch-contract f contract-address from-block to-block (next-event 0)
confirmations: (confirmations (ethereum-confirmations-wanted-in-blocks)))
(let loop ((start-block from-block) (next-event next-event))
(when (<= start-block to-block)
(let* ((current-block (wait-until-block (+ start-block confirmations)))
(confirmed-block (- current-block confirmations))
(end-block (min to-block confirmed-block)))
;; Get logs
(get-logs-from-blocks f contract-address start-block end-block next-event)
(loop (1+ end-block) 0)))))
;; Request logs between indicated blocks (inclusive).
;; Gets all logs between from-block and to-block (or starting a part-block)
;; Re-partitioning the getLogs request if we timeout / have too many logs in the repsonse.
;; : (Fun <- LogObject) (Maybe Address) BlockNumber BlockNumber ?UInt
(def (get-logs-from-blocks f contract-address
from-block to-block (next-event 0))
(when (>= to-block from-block)
(match (with-result (eth_getLogs {address: contract-address
fromBlock: from-block
toBlock: to-block}))
((some l)
(for-each (lambda (o)
(unless (and (= from-block (.@ o blockNumber)) (< (.@ o logIndex) next-event))
(f o)))
l))
((failure err)
(if (and (json-rpc-error? err) (equal? (json-rpc-error-code err) -32005)
(< from-block to-block))
;; Recoverable error: Too many logs or query timed out.
;; Recovery strategy: Repartition to decrease request size.
;; See: https://infura.io/docs/ethereum/json-rpc/eth-getLogs#limitations
;; TODO: Test this.
(let (part-block (half (+ from-block to-block)))
(get-logs-from-blocks f contract-address from-block part-block next-event)
(get-logs-from-blocks f contract-address (1+ part-block) to-block 0))
;; Unrecoverable errors: raise error
(raise err)))))) ;; Too many logs in the same block. TODO: Add contract-address from-block ?
#| ;;; The code below was hand-translated from my previous CPS-based client JavaScript.
;;; It is untested and most probably needs to be rewritten / refactored in more direct style
;;; with more transactional persistence.
;; : Quantity
(define-persistent-variable next-unprocessed-block UInt "ETH.nextUnprocessedBlock" 0)
;; : (Table (Fun <- Quantity) <- String)
(def new-block-hooks (make-hash-table))
;; Process new blocks.
;; : Unit <-
(def (process-new-blocks)
(def current-block (eth_blockNumber))
(with-logged-exceptions ()
(when (>= current-block (next-unprocessed-block))
(hash-for-each (lambda (_ hook) (hook (next-unprocessed-block) current-block)) new-block-hooks)
(set! (next-unprocessed-block) (1+ current-block)))))
;; : Unit <-
(def (watchBlockchain)
(spawn/name/logged
"watch-ethereum-blockchain"
(while #t
(process-new-blocks)
(thread-sleep! (ethereum-block-polling-period-in-seconds)))))
;; hook to synchronously watch all events of some kind as the chain keeps getting updated
(def (process-events filter process)
(lambda (fromBlock toBlock)
(set! fromBlock (max fromBlock 0))
(when (or (not (real? toBlock)) (<= fromBlock toBlock))
(for-each process (eth_getLogs (.cc filter fromBlock: fromBlock toBlock: toBlock))))))
;; Register a confirmed event hook.
;; NB: *IF* some event has hooks for both confirmed and unconfirmed events, then
;; (1) the confirmed event hook must be registered *before*, and
;; (2) the name of the confirmed event hook must be lexicographically strictly less
;; than the name for the corresponding unconfirmed event hook.
(def (register-confirmed-event-hook
name fromBlock filter process
(confirmations (ethereum-confirmations-wanted-in-blocks)))
(hash-put! new-block-hooks name
(lambda (first-unprocessed-block last-unprocessed-block)
((process-events filter process)
(- first-unprocessed-block confirmations)
(- last-unprocessed-block confirmations))))
((process-events filter process) fromBlock (1- (next-unprocessed-block))))
(def (register-unconfirmed-event-hook
name filter process
(confirmations (ethereum-confirmations-wanted-in-blocks)))
(def hook (lambda (first-unprocessed-block last-unprocessed-block)
((process-events filter process)
(max first-unprocessed-block (- last-unprocessed-block confirmations -1))
'latest)))
(hash-put! new-block-hooks name hook)
(def fromBlock (- (next-unprocessed-block) 1 confirmations))
(hook fromBlock 'latest))
;; The code in the section below might belong to some library to manage multiple interactions.
;; Managing interactions
;; runs up from 0, in the table of interactions
(define-persistent-variable next-interaction-id Integer "ETH.nextInteractionId" 0)
;; runs down from -1, in the same table of interactions
(define-persistent-variable previous-unconfirmed-id Integer "ETH.previousUnconfirmedId" -1)
(define-persistent-variable active-interactions UIntSet "ETH.activeInteractions" (.@ UIntSet .empty))
(define-type MapUIntFromDigest (Map UInt <- Digest))
(define-persistent-variable interactions-by-txhash MapUIntFromDigest "ETH.interactionsByTxhash" (.@ MapUIntFromDigest .empty))
|#