This repository has been archived by the owner on Jul 8, 2024. It is now read-only.
forked from majkelcc/hal
-
Notifications
You must be signed in to change notification settings - Fork 1
/
unison-fsmonitor
executable file
·284 lines (261 loc) · 9.64 KB
/
unison-fsmonitor
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
#!/usr/bin/env python
#
# unox
#
# Author: Hannes Landeholm <[email protected]>
#
# The Unison beta (2.48) comes with file system change monitoring (repeat = watch)
# through an abstract "unison-fsmonitor" adapter that integrates with each respective
# OS file update watch interface. This allows responsive dropbox like master-master sync
# of files over SSH. The Unison beta comes with an adapter for Windows and Linux but
# unfortunately lacks one for OS X.
#
# This script implements the Unison fswatch protocol (see /src/fswatch.ml)
# and is intended to be installed as unison-fsmonitor in the PATH in OS X. This is the
# missing puzzle piece for repeat = watch support for Unison in in OS X.
#
# Dependencies: pip install macfsevents
#
# Licence: MPLv2 (https://www.mozilla.org/MPL/2.0/)
import sys
import os
import time
import fsevents
import urllib
import traceback
import signal
import sys
def signal_handler(signal, frame):
print('You pressed Ctrl+C!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
my_log_prefix = "[unox]"
_in_debug = "--debug" in sys.argv
_in_debug_plus = False
# Global MacFSEvents observer.
observer = fsevents.Observer()
observer.start()
# Dict of monitored replicas.
# Replica hash mapped to fsevents.Stream objects.
replicas = {}
# Dict of pending replicas that are beeing waited on.
# Replica hash mapped to True if replica is pending.
pending_reps = {}
# Dict of triggered replicas.
# Replica hash mapped to recursive dict where keys are path tokens or True for pending leaf.
triggered_reps = {}
def format_exception(e):
# Thanks for not bundling this function in the Python library Guido. *facepalm*
exception_list = traceback.format_stack()
exception_list = exception_list[:-2]
exception_list.extend(traceback.format_tb(sys.exc_info()[2]))
exception_list.extend(traceback.format_exception_only(sys.exc_info()[0], sys.exc_info()[1]))
exception_str = "Traceback (most recent call last):\n"
exception_str += "".join(exception_list)
exception_str = exception_str[:-1]
return exception_str
def _debug_triggers():
global pending_reps, triggered_reps
if not _in_debug_plus:
return
wait_info = ""
if len(pending_reps) > 0:
wait_info = " | wait=" + str(pending_reps)
sys.stderr.write(my_log_prefix + "[DEBUG+]: trig=" + str(triggered_reps) + wait_info + "\n")
def _debug(msg):
sys.stderr.write(my_log_prefix + "[DEBUG]: " + msg.strip() + "\n")
def warn(msg):
sys.stderr.write(my_log_prefix + "[WARN]: " + msg.strip() + "\n")
def sendCmd(cmd, args):
raw_cmd = cmd
for arg in args:
raw_cmd += " " + urllib.quote(arg);
if _in_debug: _debug("sendCmd: " + raw_cmd)
sys.stdout.write(raw_cmd + "\n")
# Safely injects a command to send from non-receive context.
def injectCmd(cmd, args):
sendCmd(cmd, args)
sys.stdout.flush()
def sendAck():
sendCmd("OK", [])
def sendError(msg):
sendCmd("ERROR", [msg])
os._exit(1)
def recvCmd():
# We flush before stalling on read instead of
# flushing every write for optimization purposes.
sys.stdout.flush()
line = sys.stdin.readline()
if not line.endswith("\n"):
# End of stream means we're done.
if _in_debug: _debug("stdin closed, exiting")
sys.exit(0)
if _in_debug: _debug("recvCmd: " + line)
# Parse cmd and args. Args are url encoded.
words = line.strip().split(" ")
args = []
for word in words[1:]:
args.append(urllib.unquote(word))
return [words[0], args]
def pathTokenize(path):
path_toks = []
for path_tok in path.split("/"):
if len(path_tok) > 0:
path_toks.append(path_tok)
return path_toks
def triggerReplica(replica, local_path_toks):
global pending_reps, triggered_reps
if replica in pending_reps:
# Got event for pending replica, notify and reset wait.
injectCmd("CHANGES", [replica])
pending_reps = {}
# Handle root.
if len(local_path_toks) == 0:
triggered_reps[replica] = True
return
elif not replica in triggered_reps:
cur_lvl = {}
triggered_reps[replica] = cur_lvl
else:
cur_lvl = triggered_reps[replica]
# Iterate through branches.
for branch_path_tok in local_path_toks[:len(local_path_toks) - 1]:
if cur_lvl == True:
return
if not branch_path_tok in cur_lvl:
new_lvl = {}
cur_lvl[branch_path_tok] = new_lvl
else:
new_lvl = cur_lvl[branch_path_tok]
cur_lvl = new_lvl
# Handle leaf.
if cur_lvl == True:
return
leaf_path_tok = local_path_toks[len(local_path_toks) - 1]
cur_lvl[leaf_path_tok] = True
_debug_triggers()
# Starts monitoring of a replica.
def startReplicaMon(replica, fspath, path):
global replicas, observer
if not replica in replicas:
# Ensure fspath has trailing slash.
fspath = os.path.join(fspath, "")
if _in_debug: _debug("start monitoring of replica [" + replica + "] [" + fspath + "]")
def replicaFileEventCallback(path, mask):
try:
if not path.startswith(fspath):
return warn("unexpected file event at path [" + path + "] for [" + fspath + "]")
local_path = path[len(fspath):]
local_path_toks = pathTokenize(local_path)
if _in_debug: _debug("replica:[" + replica + "] file event @[" + local_path + "] (" + path + ")")
triggerReplica(replica, local_path_toks)
except Exception as e:
# Because python is a horrible language it has a special behavior for non-main threads that
# fails to catch an exception. Instead of crashing the process, only the thread is destroyed.
# We fix this with this catch all exception handler.
sys.stderr.write(format_exception(e))
sys.stderr.flush()
os._exit(1)
try:
# OS X has no interface for "file level" events. You would have to implement this manually in userspace,
# and compare against a snapshot. This means there's no point in us doing it, better leave it to Unison.
if _in_debug: _debug("replica:[" + replica + "] watching path [" + fspath + "]")
stream = fsevents.Stream(replicaFileEventCallback, fspath)
observer.schedule(stream)
except (FileNotFoundError, NotADirectoryError) as e:
sendError(str(e))
replicas[replica] = {
"stream": stream,
"fspath": fspath
}
sendAck()
while True:
[cmd, args] = recvCmd();
if cmd == "DIR":
sendAck()
elif cmd == "LINK":
sendError("link following is not supported by unison-watchdog, please disable this option (-links)")
elif cmd == "DONE":
return
else:
sendError("unexpected cmd in replica start: " + cmd)
def reportRecursiveChanges(local_path, cur_lvl):
if (cur_lvl == True):
sendCmd("RECURSIVE", [local_path])
return
for path_tok, new_lvl in cur_lvl.items():
reportRecursiveChanges(os.path.join(local_path, path_tok), new_lvl);
def main():
global replicas, pending_reps, triggered_reps
# Version handshake.
sendCmd("VERSION", ["1"])
[cmd, args] = recvCmd();
if cmd != "VERSION":
sendError("unexpected version cmd: " + cmd)
[v_no] = args
if v_no != "1":
warn("unexpected version: " + v_no)
# Start watch operation.
_debug_triggers()
while True:
[cmd, args] = recvCmd();
# Cancel pending waits when any other command is received.
if cmd != "WAIT":
pending_reps = {}
# Check command.
if cmd == "DEBUG":
_in_debug = True
elif cmd == "START":
# Start observing replica.
if len(args) >= 3:
[replica, fspath, path] = args
else:
# No path, only monitoring fspath.
[replica, fspath] = args
path = ""
startReplicaMon(replica, fspath, path)
elif cmd == "WAIT":
# Start waiting for another replica.
[replica] = args
if not replica in replicas:
sendError("unknown replica: " + replica)
if replica in triggered_reps:
# Is pre-triggered replica.
sendCmd("CHANGES", replica)
pending_reps = {}
else:
pending_reps[replica] = True
_debug_triggers()
elif cmd == "CHANGES":
# Get pending replicas.
[replica] = args
if not replica in replicas:
sendError("unknown replica: " + replica)
if replica in triggered_reps:
reportRecursiveChanges("", triggered_reps[replica])
del triggered_reps[replica]
sendCmd("DONE", [])
_debug_triggers()
elif cmd == "RESET":
# Stop observing replica.
[replica] = args
if not replica in replicas:
warn("unknown replica: " + replica)
continue
stream = replicas[replica]["stream"]
if stream is not None:
observer.unschedule(stream)
del replicas[replica]
if replica in triggered_reps:
del triggered_reps[replica]
_debug_triggers()
else:
sendError("unexpected root cmd: " + cmd)
if __name__ == '__main__':
try:
main()
finally:
for replica in replicas:
observer.unschedule(replicas[replica]["stream"])
observer.stop()
observer.join()