Skip to content

Commit 84a833e

Browse files
authoredAug 6, 2020
Merge pull request #170 from webpack/bugfix/watcher-limit
Merge watchers
2 parents 7cb31ca + 224fc1f commit 84a833e

11 files changed

+872
-181
lines changed
 

‎.travis.yml

+9-5
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,24 @@ stages:
99
matrix:
1010
include:
1111
- os: linux
12-
node_js: "12"
12+
node_js: "14"
1313
stage: basic
1414
- os: osx
15-
node_js: "12"
15+
node_js: "14"
1616
stage: basic
17+
- os: osx
18+
node_js: "14"
19+
env: WATCHPACK_WATCHER_LIMIT=1
20+
stage: advanced
1721
- os: linux
18-
node_js: "12"
22+
node_js: "14"
1923
env: WATCHPACK_POLLING=200
2024
stage: advanced
2125
- os: linux
22-
node_js: "13"
26+
node_js: "12"
2327
stage: advanced
2428
- os: linux
25-
node_js: "13"
29+
node_js: "12"
2630
env: WATCHPACK_POLLING=200
2731
stage: advanced
2832
- os: linux

‎appveyor.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ cache:
1010
# what combinations to test
1111
environment:
1212
matrix:
13-
- nodejs_version: 13
13+
- nodejs_version: 14
1414
- nodejs_version: 12
1515
- nodejs_version: 10
1616

‎azure-pipelines.yml

+6-6
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ jobs:
55
strategy:
66
maxParallel: 3
77
matrix:
8-
node-13:
9-
node_version: ^13.0.0
8+
node-14:
9+
node_version: ^14.0.0
1010
node-12:
1111
node_version: ^12.4.0
1212
node-10:
@@ -42,8 +42,8 @@ jobs:
4242
strategy:
4343
maxParallel: 3
4444
matrix:
45-
node-13:
46-
node_version: ^13.0.0
45+
node-14:
46+
node_version: ^14.0.0
4747
node-12:
4848
node_version: ^12.4.0
4949
node-10:
@@ -79,8 +79,8 @@ jobs:
7979
strategy:
8080
maxParallel: 3
8181
matrix:
82-
node-13:
83-
node_version: ^13.0.0
82+
node-14:
83+
node_version: ^14.0.0
8484
node-12:
8585
node_version: ^12.4.0
8686
node-10:

‎lib/DirectoryWatcher.js

+125-105
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ const EventEmitter = require("events").EventEmitter;
88
const fs = require("graceful-fs");
99
const path = require("path");
1010

11+
const watchEventSource = require("./watchEventSource");
12+
1113
const EXISTANCE_ONLY_TIME_ENTRY = Object.freeze({});
1214

1315
let FS_ACCURACY = 1000;
@@ -77,6 +79,7 @@ class DirectoryWatcher extends EventEmitter {
7779
: options.poll
7880
? 5007
7981
: false;
82+
this.timeout = undefined;
8083
this.initialScanRemoved = new Set();
8184
this.initialScanFinished = undefined;
8285
/** @type {Map<string, Set<Watcher>>} */
@@ -102,20 +105,19 @@ class DirectoryWatcher extends EventEmitter {
102105
createWatcher() {
103106
try {
104107
if (this.polledWatching) {
105-
// Poll for changes
106-
const interval = setInterval(() => {
107-
this.doScan(false);
108-
}, this.polledWatching);
109108
this.watcher = {
110109
close: () => {
111-
clearInterval(interval);
110+
if (this.timeout) {
111+
clearTimeout(this.timeout);
112+
this.timeout = undefined;
113+
}
112114
}
113115
};
114116
} else {
115117
if (IS_OSX) {
116118
this.watchInParentDirectory();
117119
}
118-
this.watcher = fs.watch(this.path);
120+
this.watcher = watchEventSource.watch(this.path);
119121
this.watcher.on("change", this.onWatchEvent.bind(this));
120122
this.watcher.on("error", this.onWatcherError.bind(this));
121123
}
@@ -464,6 +466,16 @@ class DirectoryWatcher extends EventEmitter {
464466
if (err) {
465467
console.error("Watchpack Error (initial scan): " + err);
466468
}
469+
this.onScanFinished();
470+
}
471+
472+
onScanFinished() {
473+
if (this.polledWatching) {
474+
this.timeout = setTimeout(() => {
475+
if (this.closed) return;
476+
this.doScan(false);
477+
}, this.polledWatching);
478+
}
467479
}
468480

469481
onDirectoryRemoved(reason) {
@@ -526,124 +538,132 @@ class DirectoryWatcher extends EventEmitter {
526538
return;
527539
}
528540
this.scanning = true;
529-
fs.readdir(this.path, (err, items) => {
541+
if (this.timeout) {
542+
clearTimeout(this.timeout);
543+
this.timeout = undefined;
544+
}
545+
process.nextTick(() => {
530546
if (this.closed) return;
531-
if (err) {
532-
if (err.code === "ENOENT" || err.code === "EPERM") {
533-
this.onDirectoryRemoved("scan readdir failed");
534-
} else {
535-
this.onScanError(err);
536-
}
537-
this.initialScan = false;
538-
this.initialScanFinished = Date.now();
539-
if (initial) {
540-
for (const watchers of this.watchers.values()) {
541-
for (const watcher of watchers) {
542-
if (watcher.checkStartTime(this.initialScanFinished, false)) {
543-
watcher.emit(
544-
"initial-missing",
545-
"scan (parent directory missing in initial scan)"
546-
);
547+
fs.readdir(this.path, (err, items) => {
548+
if (this.closed) return;
549+
if (err) {
550+
if (err.code === "ENOENT" || err.code === "EPERM") {
551+
this.onDirectoryRemoved("scan readdir failed");
552+
} else {
553+
this.onScanError(err);
554+
}
555+
this.initialScan = false;
556+
this.initialScanFinished = Date.now();
557+
if (initial) {
558+
for (const watchers of this.watchers.values()) {
559+
for (const watcher of watchers) {
560+
if (watcher.checkStartTime(this.initialScanFinished, false)) {
561+
watcher.emit(
562+
"initial-missing",
563+
"scan (parent directory missing in initial scan)"
564+
);
565+
}
547566
}
548567
}
549568
}
569+
if (this.scanAgain) {
570+
this.scanAgain = false;
571+
this.doScan(this.scanAgainInitial);
572+
} else {
573+
this.scanning = false;
574+
}
575+
return;
550576
}
551-
if (this.scanAgain) {
552-
this.scanAgain = false;
553-
this.doScan(this.scanAgainInitial);
554-
} else {
555-
this.scanning = false;
556-
}
557-
return;
558-
}
559-
const itemPaths = new Set(
560-
items.map(item => path.join(this.path, item.normalize("NFC")))
561-
);
562-
for (const file of this.files.keys()) {
563-
if (!itemPaths.has(file)) {
564-
this.setMissing(file, initial, "scan (missing)");
565-
}
566-
}
567-
for (const directory of this.directories.keys()) {
568-
if (!itemPaths.has(directory)) {
569-
this.setMissing(directory, initial, "scan (missing)");
570-
}
571-
}
572-
if (this.scanAgain) {
573-
// Early repeat of scan
574-
this.scanAgain = false;
575-
this.doScan(initial);
576-
return;
577-
}
578-
const itemFinished = needCalls(itemPaths.size + 1, () => {
579-
if (this.closed) return;
580-
this.initialScan = false;
581-
this.initialScanRemoved = null;
582-
this.initialScanFinished = Date.now();
583-
if (initial) {
584-
const missingWatchers = new Map(this.watchers);
585-
missingWatchers.delete(withoutCase(this.path));
586-
for (const item of itemPaths) {
587-
missingWatchers.delete(withoutCase(item));
577+
const itemPaths = new Set(
578+
items.map(item => path.join(this.path, item.normalize("NFC")))
579+
);
580+
for (const file of this.files.keys()) {
581+
if (!itemPaths.has(file)) {
582+
this.setMissing(file, initial, "scan (missing)");
588583
}
589-
for (const watchers of missingWatchers.values()) {
590-
for (const watcher of watchers) {
591-
if (watcher.checkStartTime(this.initialScanFinished, false)) {
592-
watcher.emit(
593-
"initial-missing",
594-
"scan (missing in initial scan)"
595-
);
596-
}
597-
}
584+
}
585+
for (const directory of this.directories.keys()) {
586+
if (!itemPaths.has(directory)) {
587+
this.setMissing(directory, initial, "scan (missing)");
598588
}
599589
}
600590
if (this.scanAgain) {
591+
// Early repeat of scan
601592
this.scanAgain = false;
602-
this.doScan(this.scanAgainInitial);
603-
} else {
604-
this.scanning = false;
593+
this.doScan(initial);
594+
return;
605595
}
606-
});
607-
for (const itemPath of itemPaths) {
608-
fs.lstat(itemPath, (err2, stats) => {
596+
const itemFinished = needCalls(itemPaths.size + 1, () => {
609597
if (this.closed) return;
610-
if (err2) {
611-
if (
612-
err2.code === "ENOENT" ||
613-
err2.code === "EPERM" ||
614-
err2.code === "EBUSY"
615-
) {
616-
this.setMissing(itemPath, initial, "scan (" + err2.code + ")");
617-
} else {
618-
this.onScanError(err2);
598+
this.initialScan = false;
599+
this.initialScanRemoved = null;
600+
this.initialScanFinished = Date.now();
601+
if (initial) {
602+
const missingWatchers = new Map(this.watchers);
603+
missingWatchers.delete(withoutCase(this.path));
604+
for (const item of itemPaths) {
605+
missingWatchers.delete(withoutCase(item));
606+
}
607+
for (const watchers of missingWatchers.values()) {
608+
for (const watcher of watchers) {
609+
if (watcher.checkStartTime(this.initialScanFinished, false)) {
610+
watcher.emit(
611+
"initial-missing",
612+
"scan (missing in initial scan)"
613+
);
614+
}
615+
}
619616
}
620-
itemFinished();
621-
return;
622617
}
623-
if (stats.isFile() || stats.isSymbolicLink()) {
624-
if (stats.mtime) {
625-
ensureFsAccuracy(stats.mtime);
618+
if (this.scanAgain) {
619+
this.scanAgain = false;
620+
this.doScan(this.scanAgainInitial);
621+
} else {
622+
this.scanning = false;
623+
this.onScanFinished();
624+
}
625+
});
626+
for (const itemPath of itemPaths) {
627+
fs.lstat(itemPath, (err2, stats) => {
628+
if (this.closed) return;
629+
if (err2) {
630+
if (
631+
err2.code === "ENOENT" ||
632+
err2.code === "EPERM" ||
633+
err2.code === "EBUSY"
634+
) {
635+
this.setMissing(itemPath, initial, "scan (" + err2.code + ")");
636+
} else {
637+
this.onScanError(err2);
638+
}
639+
itemFinished();
640+
return;
626641
}
627-
this.setFileTime(
628-
itemPath,
629-
+stats.mtime || +stats.ctime || 1,
630-
initial,
631-
true,
632-
"scan (file)"
633-
);
634-
} else if (stats.isDirectory()) {
635-
if (!initial || !this.directories.has(itemPath))
636-
this.setDirectory(
642+
if (stats.isFile() || stats.isSymbolicLink()) {
643+
if (stats.mtime) {
644+
ensureFsAccuracy(stats.mtime);
645+
}
646+
this.setFileTime(
637647
itemPath,
638648
+stats.mtime || +stats.ctime || 1,
639649
initial,
640-
"scan (dir)"
650+
true,
651+
"scan (file)"
641652
);
642-
}
643-
itemFinished();
644-
});
645-
}
646-
itemFinished();
653+
} else if (stats.isDirectory()) {
654+
if (!initial || !this.directories.has(itemPath))
655+
this.setDirectory(
656+
itemPath,
657+
+stats.mtime || +stats.ctime || 1,
658+
initial,
659+
"scan (dir)"
660+
);
661+
}
662+
itemFinished();
663+
});
664+
}
665+
itemFinished();
666+
});
647667
});
648668
}
649669

‎lib/getWatcherManager.js

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ class WatcherManager {
3838
}
3939

4040
const watcherManagers = new WeakMap();
41+
/**
42+
* @param {object} options options
43+
* @returns {WatcherManager} the watcher manager
44+
*/
4145
module.exports = options => {
4246
const watcherManager = watcherManagers.get(options);
4347
if (watcherManager !== undefined) return watcherManager;

‎lib/reducePlan.js

+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
MIT License http://www.opensource.org/licenses/mit-license.php
3+
Author Tobias Koppers @sokra
4+
*/
5+
"use strict";
6+
7+
const path = require("path");
8+
9+
/**
10+
* @template T
11+
* @typedef {Object} TreeNode
12+
* @property {string} filePath
13+
* @property {TreeNode} parent
14+
* @property {TreeNode[]} children
15+
* @property {number} entries
16+
* @property {boolean} active
17+
* @property {T[] | T | undefined} value
18+
*/
19+
20+
/**
21+
* @template T
22+
* @param {Map<string, T[] | T} plan
23+
* @param {number} limit
24+
* @returns {Map<string, Map<T, string>>} the new plan
25+
*/
26+
module.exports = (plan, limit) => {
27+
const treeMap = new Map();
28+
// Convert to tree
29+
for (const [filePath, value] of plan) {
30+
treeMap.set(filePath, {
31+
filePath,
32+
parent: undefined,
33+
children: undefined,
34+
entries: 1,
35+
active: true,
36+
value
37+
});
38+
}
39+
let currentCount = treeMap.size;
40+
// Create parents and calculate sum of entries
41+
for (const node of treeMap.values()) {
42+
const parentPath = path.dirname(node.filePath);
43+
if (parentPath !== node.filePath) {
44+
let parent = treeMap.get(parentPath);
45+
if (parent === undefined) {
46+
parent = {
47+
filePath: parentPath,
48+
parent: undefined,
49+
children: [node],
50+
entries: node.entries,
51+
active: false,
52+
value: undefined
53+
};
54+
treeMap.set(parentPath, parent);
55+
} else {
56+
if (parent.children === undefined) {
57+
parent.children = [node];
58+
} else {
59+
parent.children.push(node);
60+
}
61+
do {
62+
parent.entries += node.entries;
63+
parent = parent.parent;
64+
} while (parent);
65+
}
66+
}
67+
}
68+
// Reduce until limit reached
69+
while (currentCount > limit) {
70+
// Select node that helps reaching the limit most effectively without overmerging
71+
const overLimit = limit - currentCount;
72+
let bestNode = undefined;
73+
let bestCost = Infinity;
74+
for (const node of treeMap.values()) {
75+
if (node.entries <= 1 || !node.children) continue;
76+
if (node.children.length <= 1) continue;
77+
// Try to select the node with has just a bit more entries than we need to reduce
78+
// When just a bit more is over 30% over the limit,
79+
// also consider just a bit less entries then we need to reduce
80+
const cost =
81+
node.entries - 1 >= overLimit
82+
? node.entries - 1 - overLimit
83+
: overLimit - node.entries + 1 + limit * 0.3;
84+
if (cost < bestCost) {
85+
bestNode = node;
86+
bestCost = cost;
87+
}
88+
}
89+
if (!bestNode) break;
90+
// Merge all children
91+
const reduction = bestNode.entries - 1;
92+
bestNode.active = true;
93+
bestNode.entries = 1;
94+
currentCount -= reduction;
95+
let parent = bestNode.parent;
96+
while (parent) {
97+
parent.entries -= reduction;
98+
parent = parent.parent;
99+
}
100+
const queue = new Set(bestNode.children);
101+
for (const node of queue) {
102+
node.active = false;
103+
node.entries = 0;
104+
if (node.children) {
105+
for (const child of node.children) queue.add(child);
106+
}
107+
}
108+
}
109+
// Write down new plan
110+
const newPlan = new Map();
111+
for (const node of treeMap.values()) {
112+
if (!node.active) continue;
113+
const map = new Map();
114+
const queue = new Set([node]);
115+
for (const node of queue) {
116+
if (node.value) {
117+
if (Array.isArray(node.value)) {
118+
for (const item of node.value) {
119+
map.set(item, node.filePath);
120+
}
121+
} else {
122+
map.set(node.value, node.filePath);
123+
}
124+
}
125+
if (node.children) {
126+
for (const child of node.children) queue.add(child);
127+
}
128+
}
129+
newPlan.set(node.filePath, map);
130+
}
131+
return newPlan;
132+
};

‎lib/watchEventSource.js

+306
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
/*
2+
MIT License http://www.opensource.org/licenses/mit-license.php
3+
Author Tobias Koppers @sokra
4+
*/
5+
"use strict";
6+
7+
const fs = require("fs");
8+
const path = require("path");
9+
const { EventEmitter } = require("events");
10+
const reducePlan = require("./reducePlan");
11+
12+
const IS_OSX = require("os").platform() === "darwin";
13+
const IS_WIN = require("os").platform() === "win32";
14+
const SUPPORTS_RECURSIVE_WATCHING = IS_OSX || IS_WIN;
15+
16+
const watcherLimit =
17+
+process.env.WATCHPACK_WATCHER_LIMIT || (IS_OSX ? 2000 : 10000);
18+
19+
let isBatch = false;
20+
let watcherCount = 0;
21+
22+
/** @type {Map<Watcher, string>} */
23+
const pendingWatchers = new Map();
24+
25+
/** @type {Map<string, RecursiveWatcher>} */
26+
const recursiveWatchers = new Map();
27+
28+
/** @type {Map<string, DirectWatcher>} */
29+
const directWatchers = new Map();
30+
31+
/** @type {Map<Watcher, RecursiveWatcher | DirectWatcher>} */
32+
const underlyingWatcher = new Map();
33+
34+
class DirectWatcher {
35+
constructor(filePath) {
36+
this.filePath = filePath;
37+
this.watchers = new Set();
38+
this.watcher = undefined;
39+
try {
40+
const watcher = fs.watch(filePath);
41+
this.watcher = watcher;
42+
watcher.on("change", (type, filename) => {
43+
for (const w of this.watchers) {
44+
w.emit("change", type, filename);
45+
}
46+
});
47+
watcher.on("error", error => {
48+
for (const w of this.watchers) {
49+
w.emit("error", error);
50+
}
51+
});
52+
} catch (err) {
53+
process.nextTick(() => {
54+
for (const w of this.watchers) {
55+
w.emit("error", err);
56+
}
57+
});
58+
}
59+
watcherCount++;
60+
}
61+
62+
add(watcher) {
63+
underlyingWatcher.set(watcher, this);
64+
this.watchers.add(watcher);
65+
}
66+
67+
remove(watcher) {
68+
this.watchers.delete(watcher);
69+
if (this.watchers.size === 0) {
70+
directWatchers.delete(this.filePath);
71+
watcherCount--;
72+
if (this.watcher) this.watcher.close();
73+
}
74+
}
75+
76+
getWatchers() {
77+
return this.watchers;
78+
}
79+
}
80+
81+
class RecursiveWatcher {
82+
constructor(rootPath) {
83+
this.rootPath = rootPath;
84+
/** @type {Map<Watcher, string>} */
85+
this.mapWatcherToPath = new Map();
86+
/** @type {Map<string, Set<Watcher>>} */
87+
this.mapPathToWatchers = new Map();
88+
this.watcher = undefined;
89+
try {
90+
const watcher = fs.watch(rootPath, {
91+
recursive: true
92+
});
93+
this.watcher = watcher;
94+
watcher.on("change", (type, filename) => {
95+
if (!filename) {
96+
for (const w of this.mapWatcherToPath.keys()) {
97+
w.emit("change", type);
98+
}
99+
} else {
100+
const dir = path.dirname(filename);
101+
const watchers = this.mapPathToWatchers.get(dir);
102+
if (watchers === undefined) return;
103+
for (const w of watchers) {
104+
w.emit("change", type, path.basename(filename));
105+
}
106+
}
107+
});
108+
watcher.on("error", error => {
109+
for (const w of this.mapWatcherToPath.keys()) {
110+
w.emit("error", error);
111+
}
112+
});
113+
} catch (err) {
114+
process.nextTick(() => {
115+
for (const w of this.mapWatcherToPath.keys()) {
116+
w.emit("error", err);
117+
}
118+
});
119+
}
120+
watcherCount++;
121+
}
122+
123+
add(filePath, watcher) {
124+
underlyingWatcher.set(watcher, this);
125+
const subpath = filePath.slice(this.rootPath.length + 1) || ".";
126+
this.mapWatcherToPath.set(watcher, subpath);
127+
const set = this.mapPathToWatchers.get(subpath);
128+
if (set === undefined) {
129+
const newSet = new Set();
130+
newSet.add(watcher);
131+
this.mapPathToWatchers.set(subpath, newSet);
132+
} else {
133+
set.add(watcher);
134+
}
135+
}
136+
137+
remove(watcher) {
138+
const subpath = this.mapWatcherToPath.get(watcher);
139+
if (!subpath) return;
140+
this.mapWatcherToPath.delete(watcher);
141+
const set = this.mapPathToWatchers.get(subpath);
142+
set.delete(watcher);
143+
if (set.size === 0) {
144+
this.mapPathToWatchers.delete(subpath);
145+
}
146+
if (this.mapWatcherToPath.size === 0) {
147+
recursiveWatchers.delete(this.rootPath);
148+
watcherCount--;
149+
if (this.watcher) this.watcher.close();
150+
}
151+
}
152+
153+
getWatchers() {
154+
return this.mapWatcherToPath;
155+
}
156+
}
157+
158+
class Watcher extends EventEmitter {
159+
close() {
160+
if (pendingWatchers.has(this)) {
161+
pendingWatchers.delete(this);
162+
return;
163+
}
164+
const watcher = underlyingWatcher.get(this);
165+
watcher.remove(this);
166+
underlyingWatcher.delete(this);
167+
}
168+
}
169+
170+
const createDirectWatcher = filePath => {
171+
const existing = directWatchers.get(filePath);
172+
if (existing !== undefined) return existing;
173+
const w = new DirectWatcher(filePath);
174+
directWatchers.set(filePath, w);
175+
return w;
176+
};
177+
178+
const createRecursiveWatcher = rootPath => {
179+
const existing = recursiveWatchers.get(rootPath);
180+
if (existing !== undefined) return existing;
181+
const w = new RecursiveWatcher(rootPath);
182+
recursiveWatchers.set(rootPath, w);
183+
return w;
184+
};
185+
186+
const execute = () => {
187+
/** @type {Map<string, Watcher[] | Watcher>} */
188+
const map = new Map();
189+
const addWatcher = (watcher, filePath) => {
190+
const entry = map.get(filePath);
191+
if (entry === undefined) {
192+
map.set(filePath, watcher);
193+
} else if (Array.isArray(entry)) {
194+
entry.push(watcher);
195+
} else {
196+
map.set(filePath, [entry, watcher]);
197+
}
198+
};
199+
for (const [watcher, filePath] of pendingWatchers) {
200+
addWatcher(watcher, filePath);
201+
}
202+
pendingWatchers.clear();
203+
204+
// Fast case when we are not reaching the limit
205+
if (!SUPPORTS_RECURSIVE_WATCHING || watcherLimit - watcherCount >= map.size) {
206+
// Create watchers for all entries in the map
207+
for (const [filePath, entry] of map) {
208+
const w = createDirectWatcher(filePath);
209+
if (Array.isArray(entry)) {
210+
for (const item of entry) w.add(item);
211+
} else {
212+
w.add(entry);
213+
}
214+
}
215+
return;
216+
}
217+
218+
// Reconsider existing watchers to improving watch plan
219+
for (const watcher of recursiveWatchers.values()) {
220+
for (const [w, subpath] of watcher.getWatchers()) {
221+
addWatcher(w, path.join(watcher.rootPath, subpath));
222+
}
223+
}
224+
for (const watcher of directWatchers.values()) {
225+
for (const w of watcher.getWatchers()) {
226+
addWatcher(w, watcher.filePath);
227+
}
228+
}
229+
230+
// Merge map entries to keep watcher limit
231+
// Create a 10% buffer to be able to enter fast case more often
232+
const plan = reducePlan(map, watcherLimit * 0.9);
233+
234+
// Update watchers for all entries in the map
235+
for (const [filePath, entry] of plan) {
236+
if (entry.size === 1) {
237+
for (const [watcher, filePath] of entry) {
238+
const w = createDirectWatcher(filePath);
239+
const old = underlyingWatcher.get(watcher);
240+
if (old === w) continue;
241+
w.add(watcher);
242+
if (old !== undefined) old.remove(watcher);
243+
}
244+
} else {
245+
const filePaths = new Set(entry.values());
246+
if (filePaths.size > 1) {
247+
const w = createRecursiveWatcher(filePath);
248+
for (const [watcher, watcherPath] of entry) {
249+
const old = underlyingWatcher.get(watcher);
250+
if (old === w) continue;
251+
w.add(watcherPath, watcher);
252+
if (old !== undefined) old.remove(watcher);
253+
}
254+
} else {
255+
for (const filePath of filePaths) {
256+
const w = createDirectWatcher(filePath);
257+
for (const watcher of entry.keys()) {
258+
const old = underlyingWatcher.get(watcher);
259+
if (old === w) continue;
260+
w.add(watcher);
261+
if (old !== undefined) old.remove(watcher);
262+
}
263+
}
264+
}
265+
}
266+
}
267+
};
268+
269+
exports.watch = filePath => {
270+
const watcher = new Watcher();
271+
// Find an existing watcher
272+
const directWatcher = directWatchers.get(filePath);
273+
if (directWatcher !== undefined) {
274+
directWatcher.add(watcher);
275+
return watcher;
276+
}
277+
let current = filePath;
278+
for (;;) {
279+
const recursiveWatcher = recursiveWatchers.get(current);
280+
if (recursiveWatcher !== undefined) {
281+
recursiveWatcher.add(filePath, watcher);
282+
return watcher;
283+
}
284+
const parent = path.dirname(current);
285+
if (parent === current) break;
286+
current = parent;
287+
}
288+
// Queue up watcher for creation
289+
pendingWatchers.set(watcher, filePath);
290+
if (!isBatch) execute();
291+
return watcher;
292+
};
293+
294+
exports.batch = fn => {
295+
isBatch = true;
296+
try {
297+
fn();
298+
} finally {
299+
isBatch = false;
300+
execute();
301+
}
302+
};
303+
304+
exports.getNumberOfWatchers = () => {
305+
return watcherCount;
306+
};

‎lib/watchpack.js

+127-57
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ const getWatcherManager = require("./getWatcherManager");
88
const LinkResolver = require("./LinkResolver");
99
const EventEmitter = require("events").EventEmitter;
1010
const globToRegExp = require("glob-to-regexp");
11+
const watchEventSource = require("./watchEventSource");
1112

1213
let EXISTANCE_ONLY_TIME_ENTRY; // lazy required
1314

1415
const EMPTY_ARRAY = [];
16+
const EMPTY_OPTIONS = {};
1517

1618
function addWatchersToSet(watchers, set) {
1719
for (const w of watchers) {
@@ -63,18 +65,21 @@ const cachedNormalizeOptions = options => {
6365
class Watchpack extends EventEmitter {
6466
constructor(options) {
6567
super();
66-
if (!options) options = {};
67-
if (typeof options.aggregateTimeout !== "number") {
68-
options.aggregateTimeout = 200;
69-
}
68+
if (!options) options = EMPTY_OPTIONS;
7069
this.options = options;
70+
this.aggregateTimeout =
71+
typeof options.aggregateTimeout === "number"
72+
? options.aggregateTimeout
73+
: 200;
7174
this.watcherOptions = cachedNormalizeOptions(options);
7275
this.watcherManager = getWatcherManager(this.watcherOptions);
73-
this.watchers = [];
76+
this.fileWatchers = new Map();
77+
this.directoryWatchers = new Map();
78+
this.startTime = undefined;
7479
this.paused = false;
7580
this.aggregatedChanges = new Set();
7681
this.aggregatedRemovals = new Set();
77-
this.aggregateTimeout = 0;
82+
this.aggregateTimer = undefined;
7883
this._onTimeout = this._onTimeout.bind(this);
7984
}
8085

@@ -94,23 +99,30 @@ class Watchpack extends EventEmitter {
9499
startTime = arg3;
95100
}
96101
this.paused = false;
97-
const oldWatchers = this.watchers;
102+
const oldFileWatchers = this.fileWatchers;
103+
const oldDirectoryWatchers = this.directoryWatchers;
98104
const ignored = this.watcherOptions.ignored;
99105
const filter = ignored
100106
? path => !ignored.test(path.replace(/\\/g, "/"))
101107
: () => true;
102-
this.watchers = [];
108+
const addToMap = (map, key, item) => {
109+
const list = map.get(key);
110+
if (list === undefined) {
111+
map.set(key, [item]);
112+
} else {
113+
list.push(item);
114+
}
115+
};
116+
const fileWatchersNeeded = new Map();
117+
const directoryWatchersNeeded = new Map();
118+
const missingFiles = new Set();
103119
if (this.watcherOptions.followSymlinks) {
104120
const resolver = new LinkResolver();
105121
for (const file of files) {
106122
if (filter(file)) {
107123
for (const innerFile of resolver.resolve(file)) {
108124
if (file === innerFile || filter(innerFile)) {
109-
const watcher = this._fileWatcher(
110-
file,
111-
this.watcherManager.watchFile(innerFile, startTime)
112-
);
113-
if (watcher) this.watchers.push(watcher);
125+
addToMap(fileWatchersNeeded, innerFile, file);
114126
}
115127
}
116128
}
@@ -119,11 +131,8 @@ class Watchpack extends EventEmitter {
119131
if (filter(file)) {
120132
for (const innerFile of resolver.resolve(file)) {
121133
if (file === innerFile || filter(innerFile)) {
122-
const watcher = this._missingWatcher(
123-
file,
124-
this.watcherManager.watchFile(innerFile, startTime)
125-
);
126-
if (watcher) this.watchers.push(watcher);
134+
missingFiles.add(file);
135+
addToMap(fileWatchersNeeded, innerFile, file);
127136
}
128137
}
129138
}
@@ -133,13 +142,11 @@ class Watchpack extends EventEmitter {
133142
let first = true;
134143
for (const innerItem of resolver.resolve(dir)) {
135144
if (filter(innerItem)) {
136-
const watcher = this._dirWatcher(
137-
dir,
138-
first
139-
? this.watcherManager.watchDirectory(innerItem, startTime)
140-
: this.watcherManager.watchFile(innerItem, startTime)
145+
addToMap(
146+
first ? directoryWatchersNeeded : fileWatchersNeeded,
147+
innerItem,
148+
dir
141149
);
142-
if (watcher) this.watchers.push(watcher);
143150
}
144151
first = false;
145152
}
@@ -148,50 +155,118 @@ class Watchpack extends EventEmitter {
148155
} else {
149156
for (const file of files) {
150157
if (filter(file)) {
151-
const watcher = this._fileWatcher(
152-
file,
153-
this.watcherManager.watchFile(file, startTime)
154-
);
155-
if (watcher) this.watchers.push(watcher);
158+
addToMap(fileWatchersNeeded, file, file);
156159
}
157160
}
158161
for (const file of missing) {
159162
if (filter(file)) {
160-
const watcher = this._missingWatcher(
161-
file,
162-
this.watcherManager.watchFile(file, startTime)
163-
);
164-
if (watcher) this.watchers.push(watcher);
163+
missingFiles.add(file);
164+
addToMap(fileWatchersNeeded, file, file);
165165
}
166166
}
167167
for (const dir of directories) {
168168
if (filter(dir)) {
169-
const watcher = this._dirWatcher(
170-
dir,
171-
this.watcherManager.watchDirectory(dir, startTime)
172-
);
173-
if (watcher) this.watchers.push(watcher);
169+
addToMap(directoryWatchersNeeded, dir, dir);
170+
}
171+
}
172+
}
173+
const newFileWatchers = new Map();
174+
const newDirectoryWatchers = new Map();
175+
const setupFileWatcher = (watcher, key, files) => {
176+
watcher.on("initial-missing", type => {
177+
for (const file of files) {
178+
if (!missingFiles.has(file)) this._onRemove(file, file, type);
179+
}
180+
});
181+
watcher.on("change", (mtime, type) => {
182+
for (const file of files) {
183+
this._onChange(file, mtime, file, type);
184+
}
185+
});
186+
watcher.on("remove", type => {
187+
for (const file of files) {
188+
this._onRemove(file, file, type);
189+
}
190+
});
191+
newFileWatchers.set(key, watcher);
192+
};
193+
const setupDirectoryWatcher = (watcher, key, directories) => {
194+
watcher.on("initial-missing", type => {
195+
for (const item of directories) {
196+
this._onRemove(item, item, type);
197+
}
198+
});
199+
watcher.on("change", (file, mtime, type) => {
200+
for (const item of directories) {
201+
this._onChange(item, mtime, file, type);
174202
}
203+
});
204+
watcher.on("remove", type => {
205+
for (const item of directories) {
206+
this._onRemove(item, item, type);
207+
}
208+
});
209+
newDirectoryWatchers.set(key, watcher);
210+
};
211+
// Close unneeded old watchers
212+
const fileWatchersToClose = [];
213+
const directoryWatchersToClose = [];
214+
for (const [key, w] of oldFileWatchers) {
215+
if (!fileWatchersNeeded.has(key)) {
216+
w.close();
217+
} else {
218+
fileWatchersToClose.push(w);
175219
}
176220
}
177-
for (const w of oldWatchers) w.close();
221+
for (const [key, w] of oldDirectoryWatchers) {
222+
if (!directoryWatchersNeeded.has(key)) {
223+
w.close();
224+
} else {
225+
directoryWatchersToClose.push(w);
226+
}
227+
}
228+
// Create new watchers and install handlers on these watchers
229+
watchEventSource.batch(() => {
230+
for (const [key, files] of fileWatchersNeeded) {
231+
const watcher = this.watcherManager.watchFile(key, startTime);
232+
if (watcher) {
233+
setupFileWatcher(watcher, key, files);
234+
}
235+
}
236+
for (const [key, directories] of directoryWatchersNeeded) {
237+
const watcher = this.watcherManager.watchDirectory(key, startTime);
238+
if (watcher) {
239+
setupDirectoryWatcher(watcher, key, directories);
240+
}
241+
}
242+
});
243+
// Close old watchers
244+
for (const w of fileWatchersToClose) w.close();
245+
for (const w of directoryWatchersToClose) w.close();
246+
// Store watchers
247+
this.fileWatchers = newFileWatchers;
248+
this.directoryWatchers = newDirectoryWatchers;
249+
this.startTime = startTime;
178250
}
179251

180252
close() {
181253
this.paused = true;
182-
if (this.aggregateTimeout) clearTimeout(this.aggregateTimeout);
183-
for (const w of this.watchers) w.close();
184-
this.watchers.length = 0;
254+
if (this.aggregateTimer) clearTimeout(this.aggregateTimer);
255+
for (const w of this.fileWatchers.values()) w.close();
256+
for (const w of this.directoryWatchers.values()) w.close();
257+
this.fileWatchers.clear();
258+
this.directoryWatchers.clear();
185259
}
186260

187261
pause() {
188262
this.paused = true;
189-
if (this.aggregateTimeout) clearTimeout(this.aggregateTimeout);
263+
if (this.aggregateTimer) clearTimeout(this.aggregateTimer);
190264
}
191265

192266
getTimes() {
193267
const directoryWatchers = new Set();
194-
addWatchersToSet(this.watchers, directoryWatchers);
268+
addWatchersToSet(this.fileWatchers.values(), directoryWatchers);
269+
addWatchersToSet(this.directoryWatchers.values(), directoryWatchers);
195270
const obj = Object.create(null);
196271
for (const w of directoryWatchers) {
197272
const times = w.getTimes();
@@ -206,7 +281,8 @@ class Watchpack extends EventEmitter {
206281
.EXISTANCE_ONLY_TIME_ENTRY;
207282
}
208283
const directoryWatchers = new Set();
209-
addWatchersToSet(this.watchers, directoryWatchers);
284+
addWatchersToSet(this.fileWatchers.values(), directoryWatchers);
285+
addWatchersToSet(this.directoryWatchers.values(), directoryWatchers);
210286
const map = new Map();
211287
for (const w of directoryWatchers) {
212288
const times = w.getTimeInfoEntries();
@@ -270,30 +346,24 @@ class Watchpack extends EventEmitter {
270346
file = file || item;
271347
if (this.paused) return;
272348
this.emit("change", file, mtime, type);
273-
if (this.aggregateTimeout) clearTimeout(this.aggregateTimeout);
349+
if (this.aggregateTimer) clearTimeout(this.aggregateTimer);
274350
this.aggregatedRemovals.delete(item);
275351
this.aggregatedChanges.add(item);
276-
this.aggregateTimeout = setTimeout(
277-
this._onTimeout,
278-
this.options.aggregateTimeout
279-
);
352+
this.aggregateTimer = setTimeout(this._onTimeout, this.aggregateTimeout);
280353
}
281354

282355
_onRemove(item, file, type) {
283356
file = file || item;
284357
if (this.paused) return;
285358
this.emit("remove", file, type);
286-
if (this.aggregateTimeout) clearTimeout(this.aggregateTimeout);
359+
if (this.aggregateTimer) clearTimeout(this.aggregateTimer);
287360
this.aggregatedChanges.delete(item);
288361
this.aggregatedRemovals.add(item);
289-
this.aggregateTimeout = setTimeout(
290-
this._onTimeout,
291-
this.options.aggregateTimeout
292-
);
362+
this.aggregateTimer = setTimeout(this._onTimeout, this.aggregateTimeout);
293363
}
294364

295365
_onTimeout() {
296-
this.aggregateTimeout = 0;
366+
this.aggregateTimer = undefined;
297367
const changes = this.aggregatedChanges;
298368
const removals = this.aggregatedRemovals;
299369
this.aggregatedChanges = new Set();

‎test/ManyWatchers.js

+63
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
/*globals describe it beforeEach afterEach */
2+
"use strict";
3+
4+
require("should");
5+
const path = require("path");
6+
const TestHelper = require("./helpers/TestHelper");
7+
const Watchpack = require("../lib/watchpack");
8+
const watchEventSource = require("../lib/watchEventSource");
9+
10+
const fixtures = path.join(__dirname, "fixtures");
11+
const testHelper = new TestHelper(fixtures);
12+
13+
describe("ManyWatchers", function() {
14+
this.timeout(180000);
15+
beforeEach(testHelper.before);
16+
afterEach(testHelper.after);
17+
18+
it("should watch more than 4096 directories", done => {
19+
const files = [];
20+
for (let i = 1; i <= 5000; i++) {
21+
let highBit = 1;
22+
let j = i;
23+
while (j > 1) {
24+
highBit <<= 1;
25+
j >>= 1;
26+
}
27+
const dir = `${i & highBit}/${i & ~highBit}`;
28+
if (i === highBit) {
29+
testHelper.dir(`${i}`);
30+
testHelper.file(`${i}/file`);
31+
files.push(path.join(fixtures, `${i}`, "file"));
32+
}
33+
testHelper.dir(dir);
34+
testHelper.file(`${dir}/file`);
35+
files.push(path.join(fixtures, dir, "file"));
36+
if (i === highBit) {
37+
testHelper.file(`${dir}/file2`);
38+
files.push(path.join(fixtures, dir, "file2"));
39+
}
40+
}
41+
testHelper.tick(1000, () => {
42+
const w = new Watchpack({
43+
aggregateTimeout: 1000
44+
});
45+
w.on("aggregated", function(changes) {
46+
Array.from(changes).should.be.eql([
47+
path.join(fixtures, "4096/900/file")
48+
]);
49+
w.close();
50+
done();
51+
});
52+
for (let i = 100; i < files.length; i += 987) {
53+
for (let j = 0; j < files.length - i; j += 987) {
54+
w.watch({ files: files.slice(j, j + i) });
55+
}
56+
}
57+
w.watch({ files });
58+
testHelper.tick(10000, () => {
59+
testHelper.file("4096/900/file");
60+
});
61+
});
62+
});
63+
});

‎test/Watchpack.js

+96-7
Original file line numberDiff line numberDiff line change
@@ -590,12 +590,11 @@ describe("Watchpack", function() {
590590
});
591591

592592
it("should detect a single change to future timestamps", function(done) {
593-
var w = new Watchpack({
594-
aggregateTimeout: 1000
595-
});
596-
var w2 = new Watchpack({
593+
const options = {
597594
aggregateTimeout: 1000
598-
});
595+
};
596+
var w = new Watchpack(options);
597+
var w2 = new Watchpack(options);
599598
w.on("change", function() {
600599
throw new Error("should not report change event");
601600
});
@@ -604,12 +603,37 @@ describe("Watchpack", function() {
604603
});
605604
testHelper.file("a");
606605
testHelper.tick(400, function() {
607-
w2.watch([path.join(fixtures, "a")], []);
606+
w2.watch([path.join(fixtures, "a")], [], Date.now());
608607
testHelper.tick(1000, function() {
609608
// wait for initial scan
610609
testHelper.mtime("a", Date.now() + 1000000);
611610
testHelper.tick(400, function() {
612-
w.watch([path.join(fixtures, "a")], []);
611+
w.watch([path.join(fixtures, "a")], [], Date.now());
612+
testHelper.tick(1000, function() {
613+
w2.close();
614+
w.close();
615+
done();
616+
});
617+
});
618+
});
619+
});
620+
});
621+
622+
it("should create different watchers for different options", function(done) {
623+
var w = new Watchpack({
624+
aggregateTimeout: 1000
625+
});
626+
var w2 = new Watchpack({
627+
aggregateTimeout: 1000
628+
});
629+
testHelper.file("a");
630+
testHelper.tick(400, function() {
631+
w.watch([path.join(fixtures, "a")], [], Date.now());
632+
w2.watch([path.join(fixtures, "a")], [], Date.now());
633+
testHelper.tick(1000, function() {
634+
testHelper.file("a");
635+
testHelper.tick(400, function() {
636+
testHelper.file("a");
613637
testHelper.tick(1000, function() {
614638
w2.close();
615639
w.close();
@@ -894,6 +918,71 @@ describe("Watchpack", function() {
894918
});
895919
});
896920

921+
it("should allow to reuse watchers when watch is called again", function(done) {
922+
var w = new Watchpack({
923+
aggregateTimeout: 1000
924+
});
925+
w.on("aggregated", () => {
926+
done(new Error("should not fire"));
927+
});
928+
testHelper.dir("dir");
929+
testHelper.dir("dir/b");
930+
testHelper.dir("dir/b/sub");
931+
testHelper.file("dir/b/sub/file");
932+
testHelper.file("dir/b/file");
933+
testHelper.dir("dir/b1");
934+
testHelper.dir("dir/b1/sub");
935+
testHelper.file("dir/b1/sub/file");
936+
testHelper.file("dir/b1/file");
937+
testHelper.dir("dir/b2");
938+
testHelper.dir("dir/b2/sub");
939+
testHelper.file("dir/b2/sub/file");
940+
testHelper.file("dir/b2/file");
941+
testHelper.file("dir/a");
942+
testHelper.file("dir/a1");
943+
testHelper.file("dir/a2");
944+
testHelper.tick(() => {
945+
w.watch({
946+
files: [
947+
path.join(fixtures, "dir", "a"),
948+
path.join(fixtures, "dir", "a1")
949+
],
950+
directories: [
951+
path.join(fixtures, "dir", "b"),
952+
path.join(fixtures, "dir", "b1")
953+
],
954+
missing: [
955+
path.join(fixtures, "dir", "c"),
956+
path.join(fixtures, "dir", "c1")
957+
]
958+
});
959+
testHelper.tick(() => {
960+
w.watch({
961+
files: [
962+
path.join(fixtures, "dir", "a"),
963+
path.join(fixtures, "dir", "a2")
964+
],
965+
directories: [
966+
path.join(fixtures, "dir", "b"),
967+
path.join(fixtures, "dir", "b2")
968+
],
969+
missing: [
970+
path.join(fixtures, "dir", "c"),
971+
path.join(fixtures, "dir", "c2")
972+
]
973+
});
974+
testHelper.file("dir/b1/sub/file");
975+
testHelper.file("dir/a1");
976+
testHelper.file("dir/c1");
977+
testHelper.tick(2000, () => {
978+
// no event fired
979+
w.close();
980+
done();
981+
});
982+
});
983+
});
984+
});
985+
897986
let symlinksSupported = false;
898987
try {
899988
const fs = require("fs");

‎test/helpers/TestHelper.js

+3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ var path = require("path");
55
var rimraf = require("rimraf");
66
var writeFileAtomic = require("write-file-atomic");
77

8+
var watchEventSource = require("../../lib/watchEventSource");
9+
810
require("../../lib/getWatcherManager");
911
var watcherManagerModule =
1012
require.cache[require.resolve("../../lib/getWatcherManager")];
@@ -20,6 +22,7 @@ const checkAllWatcherClosed = () => {
2022
for (const watcherManager of allWatcherManager) {
2123
Array.from(watcherManager.directoryWatchers.keys()).should.be.eql([]);
2224
}
25+
watchEventSource.getNumberOfWatchers().should.be.eql(0);
2326
};
2427

2528
function TestHelper(testdir) {

0 commit comments

Comments
 (0)
Please sign in to comment.