-
Notifications
You must be signed in to change notification settings - Fork 3
/
index.js
182 lines (151 loc) · 5.3 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
'use strict'
import fs from 'fs'
import path from 'path'
const TOLERANCE = 200 // For ReadDirectoryChangesW() double reporting.
const PLATFORMS = ['win32', 'darwin'] // Native recursive support.
// -----------------------------------------------------------------------------
// HELPERS AND POLYFILLS
// TODO: Refactor after fsPromises.watch is available in the next LTS version.
// -----------------------------------------------------------------------------
// Graceful closing.
function createWatchers (abortSignal) {
const watchers = {}
// Close all watchers on abort.
abortSignal && abortSignal.addEventListener('abort', () => {
for (const entityPath of Object.keys(watchers)) {
close(entityPath)
}
})
function has (entityPath) {
return !!watchers[entityPath]
}
function add (entityPath, w) {
if (!has(entityPath)) {
watchers[entityPath] = w
w.on('error', () => watchers.close(entityPath))
}
}
function close (entityPath) {
if (has(entityPath)) {
watchers[entityPath].close()
delete watchers[entityPath]
}
}
return { add, has, close }
}
// Chanel support for asyc generators.
function createChannel (abortSignal) {
const messageQueue = []
const promiseQueue = []
abortSignal && abortSignal.addEventListener('abort', () => {
const nextPromise = promiseQueue.shift()
nextPromise && nextPromise.resolve()
})
function put (msg) {
// Anyone waiting for a message?
if (promiseQueue.length) {
// Deliver the message to the oldest one waiting (FIFO).
const nextPromise = promiseQueue.shift()
nextPromise.resolve(msg)
} else {
// No one is waiting - queue the event.
messageQueue.push(msg)
}
}
function take () {
// Do we have queued messages?
if (messageQueue.length) {
// Deliver the oldest queued message.
return Promise.resolve(messageQueue.shift())
} else {
// No queued messages - queue the taker until a message arrives.
return new Promise((resolve, reject) => promiseQueue.push({ resolve, reject }))
}
}
return { put, take }
}
// -----------------------------------------------------------------------------
// WATCHERS
// -----------------------------------------------------------------------------
// Native recursive watcher.
function watchNative (pathToWatch, options, callback) {
const last = { filePath: null, timestamp: 0 }
const recursive = options.recursive ?? true
// Do not create a watcher if already present.
if (options.watchers.has(pathToWatch)) {
return
}
const w = fs.watch(pathToWatch, { recursive }, async (event, fileName) => {
// On Windows fileName may actually be empty.
// In such case assume this is the working dir change.
const filePath = fileName ? path.join(pathToWatch, fileName) : pathToWatch
// callback(filePath)
try {
const stat = await fs.promises.stat(filePath)
const timestamp = (new Date(stat.mtime)).getTime()
const ready = timestamp - last.timestamp >= options.tolerance
const fileMatches = filePath === last.filePath
last.filePath = filePath
last.timestamp = timestamp
// Avoid double reporting if change occurs withint the tolerance.
if (!fileMatches || ready) {
callback(filePath)
}
} catch (err) {
// File is likely deleted.
callback(filePath)
}
})
options.watchers.add(pathToWatch, w)
}
// Fallback recursive watcher.
function watchFallback (pathToWatch, options, callback) {
const dirs = [pathToWatch]
for (const dir of dirs) {
// Append dirs with descendants.
for (const entityName of fs.readdirSync(dir)) {
const entityPath = path.join(dir, entityName)
fs.statSync(entityPath).isDirectory() && dirs.push(entityPath)
}
// Shallow watch using native watcher.
watchNative(dir, { ...options, recursive: false }, async entityPath => {
try {
const stat = await fs.promises.stat(entityPath)
// Watch newly created directory.
if (stat.isDirectory()) {
watchFallback(entityPath, options, callback)
}
} catch (err) {
// Close watcher for deleted directory.
options.watchers.close(entityPath)
}
callback(entityPath)
})
}
}
export default async function * watch (pathsToWatch, options = {}) {
// Normalize paths to array.
pathsToWatch = pathsToWatch.constructor === Array ? pathsToWatch : [pathsToWatch]
// Set default tolerance for Windows.
options.tolerance = options.tolerance ?? (process.platform === 'win32' ? TOLERANCE : 0)
// Choose the the watch. function.
options.fallback = options.fallback ?? !PLATFORMS.includes(process.platform)
const watchFunction = options.fallback ? watchFallback : watchNative
// Create watchers registry.
options.watchers = createWatchers(options.signal)
// Put results to the channel.
const channel = createChannel(options.signal)
for (const pathToWatch of pathsToWatch) {
watchFunction(path.normalize(pathToWatch), options, changedPath => {
channel.put(changedPath)
})
}
// Yield changes until aborted.
const signal = options.signal ?? { aborted: false }
while (!signal.aborted) {
const changedPath = await channel.take()
if (changedPath) { // Path will be undefined when watch is aborted.
yield changedPath
}
}
}