Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is going to be used to avoid harmful race conditions in parallel extraction operations. For example, a Link to a/b requires that the File at a/b is present. But, if they are run in parallel, then the File might not be written before the Link tries to create, and it'll fail with ENOENT. This gets worse if there's a Directory at a/b/c that unlinks the File so that it can create a dir.
- Loading branch information
Showing
2 changed files
with
181 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,125 @@ | ||
// A path exclusive reservation system | ||
// reserve([list, of, paths], fn) | ||
// When the fn is first in line for all its paths, it | ||
// is called with a cb that clears the reservation. | ||
// | ||
// Used by async unpack to avoid clobbering paths in use, | ||
// while still allowing maximal safe parallelization. | ||
|
||
const assert = require('assert') | ||
|
||
module.exports = () => { | ||
// path => [function or Set] | ||
// A Set object means a directory reservation | ||
// A fn is a direct reservation on that path | ||
const queues = new Map() | ||
|
||
// fn => {paths:[path,...], dirs:[path, ...]} | ||
const reservations = new Map() | ||
|
||
// return a set of parent dirs for a given path | ||
const { join } = require('path') | ||
const getDirs = path => | ||
join(path).split(/[\\\/]/).slice(0, -1).reduce((set, path) => | ||
set.length ? set.concat(join(set[set.length-1], path)) : [path], []) | ||
|
||
// functions currently running | ||
const running = new Set() | ||
|
||
// return the queues for each path the function cares about | ||
// fn => {paths, dirs} | ||
const getQueues = fn => { | ||
const res = reservations.get(fn) | ||
/* istanbul ignore if - unpossible */ | ||
if (!res) | ||
throw new Error('function does not have any path reservations') | ||
return { | ||
paths: res.paths.map(path => queues.get(path)), | ||
dirs: [...res.dirs].map(path => queues.get(path)), | ||
} | ||
} | ||
|
||
// check if fn is first in line for all its paths, and is | ||
// included in the first set for all its dir queues | ||
const check = fn => { | ||
const {paths, dirs} = getQueues(fn) | ||
return paths.every(q => q[0] === fn) && | ||
dirs.every(q => q[0] instanceof Set && q[0].has(fn)) | ||
} | ||
|
||
// run the function if it's first in line and not already running | ||
const run = fn => { | ||
if (running.has(fn) || !check(fn)) | ||
return false | ||
running.add(fn) | ||
fn(() => clear(fn)) | ||
return true | ||
} | ||
|
||
const clear = fn => { | ||
if (!running.has(fn)) | ||
return false | ||
|
||
const { paths, dirs } = reservations.get(fn) | ||
const next = new Set() | ||
|
||
paths.forEach(path => { | ||
const q = queues.get(path) | ||
assert.equal(q[0], fn) | ||
if (q.length === 1) | ||
queues.delete(path) | ||
else { | ||
q.shift() | ||
if (typeof q[0] === 'function') | ||
next.add(q[0]) | ||
else | ||
q[0].forEach(fn => next.add(fn)) | ||
} | ||
}) | ||
|
||
dirs.forEach(dir => { | ||
const q = queues.get(dir) | ||
assert(q[0] instanceof Set) | ||
if (q[0].size === 1 && q.length === 1) { | ||
queues.delete(dir) | ||
} else if (q[0].size === 1) { | ||
q.shift() | ||
|
||
// must be a function or else the Set would've been reused | ||
next.add(q[0]) | ||
} else | ||
q[0].delete(fn) | ||
}) | ||
running.delete(fn) | ||
|
||
next.forEach(fn => run(fn)) | ||
return true | ||
} | ||
|
||
const reserve = (paths, fn) => { | ||
const dirs = new Set( | ||
paths.map(path => getDirs(path)).reduce((a, b) => a.concat(b)) | ||
) | ||
reservations.set(fn, {dirs, paths}) | ||
paths.forEach(path => { | ||
const q = queues.get(path) | ||
if (!q) | ||
queues.set(path, [fn]) | ||
else | ||
q.push(fn) | ||
}) | ||
dirs.forEach(dir => { | ||
const q = queues.get(dir) | ||
if (!q) | ||
queues.set(dir, [new Set([fn])]) | ||
else if (q[q.length-1] instanceof Set) | ||
q[q.length-1].add(fn) | ||
else | ||
q.push(new Set([fn])) | ||
}) | ||
|
||
return run(fn) | ||
} | ||
|
||
return { check, reserve } | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
const t = require('tap') | ||
const { reserve } = require('../lib/path-reservations.js')() | ||
|
||
t.test('basic race', t => { | ||
// simulate the race conditions we care about | ||
let didFile = false | ||
const file = done => { | ||
t.equal(didFile, false, 'only call file once') | ||
didFile = true | ||
t.equal(didLink, false, 'do file before link') | ||
setTimeout(done) | ||
} | ||
|
||
let didLink = false | ||
const link = done => { | ||
t.equal(didLink, false, 'only call once') | ||
t.equal(didFile, true, 'do file before link') | ||
didLink = true | ||
// make sure this one is super duper cleared lol | ||
// the subsequent calls are no-ops, but verify as much | ||
done() | ||
done() | ||
done() | ||
} | ||
|
||
let didDir = false | ||
const dir = done => { | ||
t.equal(didDir, false, 'only do dir once') | ||
t.equal(didLink, true, 'do link before dir') | ||
didDir = true | ||
done() | ||
} | ||
|
||
let didDir2 = false | ||
const dir2 = done => { | ||
t.equal(didDir, true, 'did dir before dir2') | ||
t.equal(didDir2, false, 'only do dir2 once') | ||
didDir2 = true | ||
done() | ||
} | ||
|
||
let didDir3 = false | ||
const dir3 = done => { | ||
t.equal(didDir2, true, 'did dir2 before dir3') | ||
t.equal(didDir3, false, 'only do dir3 once') | ||
didDir3 = true | ||
done() | ||
t.end() | ||
} | ||
|
||
t.ok(reserve(['a/b/c/d'], file), 'file starts right away') | ||
t.notOk(reserve(['a/b/c/d', 'a/b/e'], link), 'link waits') | ||
t.notOk(reserve(['a/b/e/f'], dir), 'dir waits') | ||
t.notOk(reserve(['a/b'], dir2), 'dir2 waits') | ||
t.notOk(reserve(['a/b/x'], dir3), 'dir3 waits') | ||
}) |