Skip to content

Commit a72f92f

Browse files
committedJun 8, 2022
feature: add async behavior opt-in
1 parent 6472384 commit a72f92f

File tree

3 files changed

+220
-8
lines changed

3 files changed

+220
-8
lines changed
 

‎README.md

+105-1
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,10 @@ some ways superior to) Node.js core streams.
6363
Please read these caveats if you are familiar with node-core streams and
6464
intend to use Minipass streams in your programs.
6565

66+
You can avoid most of these differences entirely (for a very
67+
small performance penalty) by setting `{async: true}` in the
68+
constructor options.
69+
6670
### Timing
6771

6872
Minipass streams are designed to support synchronous use-cases. Thus, data
@@ -82,6 +86,82 @@ This non-deferring approach makes Minipass streams much easier to reason
8286
about, especially in the context of Promises and other flow-control
8387
mechanisms.
8488

89+
Example:
90+
91+
```js
92+
const Minipass = require('minipass')
93+
const stream = new Minipass({ async: true })
94+
stream.on('data', () => console.log('data event'))
95+
console.log('before write')
96+
stream.write('hello')
97+
console.log('after write')
98+
// output:
99+
// before write
100+
// data event
101+
// after write
102+
```
103+
104+
### Exception: Async Opt-In
105+
106+
If you wish to have a Minipass stream with behavior that more
107+
closely mimics Node.js core streams, you can set the stream in
108+
async mode either by setting `async: true` in the constructor
109+
options, or by setting `stream.async = true` later on.
110+
111+
```js
112+
const Minipass = require('minipass')
113+
const asyncStream = new Minipass({ async: true })
114+
asyncStream.on('data', () => console.log('data event'))
115+
console.log('before write')
116+
asyncStream.write('hello')
117+
console.log('after write')
118+
// output:
119+
// before write
120+
// after write
121+
// data event <-- this is deferred until the next tick
122+
```
123+
124+
Switching _out_ of async mode is unsafe, as it could cause data
125+
corruption, and so is not enabled. Example:
126+
127+
```js
128+
const Minipass = require('minipass')
129+
const stream = new Minipass({ encoding: 'utf8' })
130+
stream.on('data', chunk => console.log(chunk))
131+
stream.async = true
132+
console.log('before writes')
133+
stream.write('hello')
134+
setStreamSyncAgainSomehow(stream) // <-- this doesn't actually exist!
135+
stream.write('world')
136+
console.log('after writes')
137+
// hypothetical output would be:
138+
// before writes
139+
// world
140+
// after writes
141+
// hello
142+
// NOT GOOD!
143+
```
144+
145+
To avoid this problem, once set into async mode, any attempt to
146+
make the stream sync again will be ignored.
147+
148+
```js
149+
const Minipass = require('minipass')
150+
const stream = new Minipass({ encoding: 'utf8' })
151+
stream.on('data', chunk => console.log(chunk))
152+
stream.async = true
153+
console.log('before writes')
154+
stream.write('hello')
155+
stream.async = false // <-- no-op, stream already async
156+
stream.write('world')
157+
console.log('after writes')
158+
// actual output:
159+
// before writes
160+
// after writes
161+
// hello
162+
// world
163+
```
164+
85165
### No High/Low Water Marks
86166

87167
Node.js core streams will optimistically fill up a buffer, returning `true`
@@ -97,6 +177,9 @@ If the data has nowhere to go, then `write()` returns false, and the data
97177
sits in a buffer, to be drained out immediately as soon as anyone consumes
98178
it.
99179

180+
Since nothing is ever buffered unnecessarily, there is much less
181+
copying data, and less bookkeeping about buffer capacity levels.
182+
100183
### Hazards of Buffering (or: Why Minipass Is So Fast)
101184

102185
Since data written to a Minipass stream is immediately written all the way
@@ -181,6 +264,8 @@ moving on to the next entry in an archive parse stream, etc.) then be sure
181264
to call `stream.pause()` on creation, and then `stream.resume()` once you
182265
are ready to respond to the `end` event.
183266

267+
However, this is _usually_ not a problem because:
268+
184269
### Emit `end` When Asked
185270

186271
One hazard of immediately emitting `'end'` is that you may not yet have had
@@ -197,6 +282,18 @@ To prevent calling handlers multiple times who would not expect multiple
197282
ends to occur, all listeners are removed from the `'end'` event whenever it
198283
is emitted.
199284

285+
### Emit `error` When Asked
286+
287+
The most recent error object passed to the `'error'` event is
288+
stored on the stream. If a new `'error'` event handler is added,
289+
and an error was previously emitted, then the event handler will
290+
be called immediately (or on `process.nextTick` in the case of
291+
async streams).
292+
293+
This makes it much more difficult to end up trying to interact
294+
with a broken stream, if the error handler is added after an
295+
error was previously emitted.
296+
200297
### Impact of "immediate flow" on Tee-streams
201298

202299
A "tee stream" is a stream piping to multiple destinations:
@@ -221,7 +318,7 @@ src.pipe(dest1) // 'foo' chunk flows to dest1 immediately, and is gone
221318
src.pipe(dest2) // gets nothing!
222319
```
223320

224-
The solution is to create a dedicated tee-stream junction that pipes to
321+
One solution is to create a dedicated tee-stream junction that pipes to
225322
both locations, and then pipe to _that_ instead.
226323

227324
```js
@@ -258,6 +355,13 @@ tee.on('data', handler2)
258355
src.pipe(tee)
259356
```
260357

358+
All of the hazards in this section are avoided by setting `{
359+
async: true }` in the Minipass constructor, or by setting
360+
`stream.async = true` afterwards. Note that this does add some
361+
overhead, so should only be done in cases where you are willing
362+
to lose a bit of performance in order to avoid having to refactor
363+
program logic.
364+
261365
## USAGE
262366

263367
It's a stream! Use it like a stream and it'll most likely do what you

‎index.js

+26-7
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ const OBJECTMODE = Symbol('objectMode')
2828
const DESTROYED = Symbol('destroyed')
2929
const EMITDATA = Symbol('emitData')
3030
const EMITEND = Symbol('emitData')
31+
const ASYNC = Symbol('async')
32+
33+
const defer = fn => process.nextTick(fn)
3134

3235
// TODO remove when Node v8 support drops
3336
const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
@@ -82,6 +85,7 @@ module.exports = class Minipass extends Stream {
8285
this[ENCODING] = options && options.encoding || null
8386
if (this[ENCODING] === 'buffer')
8487
this[ENCODING] = null
88+
this[ASYNC] = options && !!options.async || false
8589
this[DECODER] = this[ENCODING] ? new SD(this[ENCODING]) : null
8690
this[EOF] = false
8791
this[EMITTED_END] = false
@@ -121,6 +125,9 @@ module.exports = class Minipass extends Stream {
121125
get objectMode () { return this[OBJECTMODE] }
122126
set objectMode (om) { this[OBJECTMODE] = this[OBJECTMODE] || !!om }
123127

128+
get ['async'] () { return this[ASYNC] }
129+
set ['async'] (a) { this[ASYNC] = this[ASYNC] || !!a }
130+
124131
write (chunk, encoding, cb) {
125132
if (this[EOF])
126133
throw new Error('write after end')
@@ -139,6 +146,8 @@ module.exports = class Minipass extends Stream {
139146
if (!encoding)
140147
encoding = 'utf8'
141148

149+
const fn = this[ASYNC] ? defer : f => f()
150+
142151
// convert array buffers and typed array views into buffers
143152
// at some point in the future, we may want to do the opposite!
144153
// leave strings and buffers as-is
@@ -169,7 +178,7 @@ module.exports = class Minipass extends Stream {
169178
this.emit('readable')
170179

171180
if (cb)
172-
cb()
181+
fn(cb)
173182

174183
return this.flowing
175184
}
@@ -180,7 +189,7 @@ module.exports = class Minipass extends Stream {
180189
if (this[BUFFERLENGTH] !== 0)
181190
this.emit('readable')
182191
if (cb)
183-
cb()
192+
fn(cb)
184193
return this.flowing
185194
}
186195

@@ -208,7 +217,7 @@ module.exports = class Minipass extends Stream {
208217
this.emit('readable')
209218

210219
if (cb)
211-
cb()
220+
fn(cb)
212221

213222
return this.flowing
214223
}
@@ -358,7 +367,10 @@ module.exports = class Minipass extends Stream {
358367
dest.end()
359368
} else {
360369
this.pipes.push(new Pipe(this, dest, opts))
361-
this[RESUME]()
370+
if (this[ASYNC])
371+
defer(() => this[RESUME]())
372+
else
373+
this[RESUME]()
362374
}
363375

364376
return dest
@@ -378,7 +390,10 @@ module.exports = class Minipass extends Stream {
378390
super.emit(ev)
379391
this.removeAllListeners(ev)
380392
} else if (ev === 'error' && this[EMITTED_ERROR]) {
381-
fn.call(this, this[EMITTED_ERROR])
393+
if (this[ASYNC])
394+
defer(() => fn.call(this, this[EMITTED_ERROR]))
395+
else
396+
fn.call(this, this[EMITTED_ERROR])
382397
}
383398
return ret
384399
}
@@ -408,10 +423,14 @@ module.exports = class Minipass extends Stream {
408423
if (ev !== 'error' && ev !== 'close' && ev !== DESTROYED && this[DESTROYED])
409424
return
410425
else if (ev === 'data') {
411-
return data ? this[EMITDATA](data) : false
426+
return !data ? false
427+
: this[ASYNC] ? defer(() => this[EMITDATA](data))
428+
: this[EMITDATA](data)
412429
} else if (ev === 'end') {
413430
// only actual end gets this treatment
414-
return this[EMITTED_END] ? false : this[EMITEND]()
431+
return this[EMITTED_END] ? false
432+
: this[ASYNC] ? defer(() => this[EMITEND]())
433+
: this[EMITEND]()
415434
} else if (ev === 'close') {
416435
this[CLOSED] = true
417436
// don't emit close before 'end' and 'finish'

‎test/async-stream.js

+89
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
const t = require('tap')
2+
const MP = require('../')
3+
4+
t.test('pipe', t => {
5+
const m = new MP({ encoding: 'utf8', async: true })
6+
const d1 = new MP({ encoding: 'utf8' })
7+
const d2 = new MP({ encoding: 'utf8' })
8+
9+
m.pipe(d1)
10+
m.write('hello, ')
11+
m.pipe(d2)
12+
m.write('world')
13+
m.end()
14+
return Promise.all([d1.concat(), d2.concat()])
15+
.then(result => t.strictSame(result, ['hello, world', 'hello, world']))
16+
})
17+
18+
t.test('pipe split', t => {
19+
const m = new MP({ encoding: 'utf8' })
20+
t.equal(m.async, false)
21+
m.async = true
22+
t.equal(m.async, true)
23+
m.async = 'banana'
24+
t.equal(m.async, true)
25+
m.async = false
26+
t.equal(m.async, true, 'cannot make an async stream sync')
27+
const d1 = new MP({ encoding: 'utf8' })
28+
const d2 = new MP({ encoding: 'utf8' })
29+
30+
m.pipe(d1)
31+
m.write('hello, ')
32+
m.pipe(d2)
33+
setTimeout(() => {
34+
m.write('world')
35+
m.end()
36+
})
37+
return Promise.all([d1.concat(), d2.concat()])
38+
.then(result => t.strictSame(result, ['hello, world', 'hello, world']))
39+
})
40+
41+
t.test('data event', t => {
42+
const m = new MP({ encoding: 'utf8', async: true })
43+
const d1 = new MP({ encoding: 'utf8' })
44+
45+
const out1 = []
46+
m.on('data', c => out1.push(c))
47+
m.write('hello, ')
48+
const out2 = []
49+
m.on('data', c => out2.push(c))
50+
m.pipe(d1)
51+
m.end('world!')
52+
return d1.concat().then(res => {
53+
t.equal(res, 'hello, world!')
54+
t.equal(out1.join(''), 'hello, world!')
55+
t.equal(out2.join(''), 'hello, world!')
56+
})
57+
})
58+
59+
t.test('data event split', t => {
60+
const m = new MP({ encoding: 'utf8', async: true })
61+
const d1 = new MP({ encoding: 'utf8' })
62+
63+
const out1 = []
64+
m.on('data', c => out1.push(c))
65+
m.write('hello, ')
66+
const out2 = []
67+
m.on('data', c => out2.push(c))
68+
m.pipe(d1)
69+
setTimeout(() => m.end('world!'))
70+
return d1.concat().then(res => {
71+
t.equal(res, 'hello, world!')
72+
t.equal(out1.join(''), 'hello, world!')
73+
t.equal(out2.join(''), 'hello, world!')
74+
})
75+
})
76+
77+
t.test('defer error event', t => {
78+
const m = new MP()
79+
try { m.emit('error', new Error('poop')) } catch (_) {}
80+
m.async = true
81+
let calledErrorHandler = false
82+
m.on('error', er => {
83+
t.equal(calledErrorHandler, false)
84+
t.match(er, { message: 'poop' })
85+
calledErrorHandler = true
86+
t.end()
87+
})
88+
t.equal(calledErrorHandler, false)
89+
})

0 commit comments

Comments
 (0)
Please sign in to comment.