Skip to content

Commit 6d80111

Browse files
committedJun 10, 2022
fix: async streams only end destinations 1 time
1 parent b596282 commit 6d80111

File tree

2 files changed

+33
-6
lines changed

2 files changed

+33
-6
lines changed
 

‎index.js

+13-6
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,11 @@ const BUFFERSHIFT = Symbol('bufferShift')
2727
const OBJECTMODE = Symbol('objectMode')
2828
const DESTROYED = Symbol('destroyed')
2929
const EMITDATA = Symbol('emitData')
30-
const EMITEND = Symbol('emitData')
30+
const EMITEND = Symbol('emitEnd')
31+
const EMITEND2 = Symbol('emitEnd2')
3132
const ASYNC = Symbol('async')
3233

33-
const defer = fn => process.nextTick(fn)
34+
const defer = fn => Promise.resolve().then(fn)
3435

3536
// TODO remove when Node v8 support drops
3637
const doIter = global._MP_NO_ITERATOR_SYMBOLS_ !== '1'
@@ -427,10 +428,7 @@ module.exports = class Minipass extends Stream {
427428
: this[ASYNC] ? defer(() => this[EMITDATA](data))
428429
: this[EMITDATA](data)
429430
} else if (ev === 'end') {
430-
// only actual end gets this treatment
431-
return this[EMITTED_END] ? false
432-
: this[ASYNC] ? defer(() => this[EMITEND]())
433-
: this[EMITEND]()
431+
return this[EMITEND]()
434432
} else if (ev === 'close') {
435433
this[CLOSED] = true
436434
// don't emit close before 'end' and 'finish'
@@ -471,9 +469,18 @@ module.exports = class Minipass extends Stream {
471469
}
472470

473471
[EMITEND] () {
472+
if (this[EMITTED_END])
473+
return
474+
474475
this[EMITTED_END] = true
475476
this.readable = false
477+
if (this[ASYNC])
478+
defer(() => this[EMITEND2]())
479+
else
480+
this[EMITEND2]()
481+
}
476482

483+
[EMITEND2] () {
477484
if (this[DECODER]) {
478485
const data = this[DECODER].end()
479486
if (data) {

‎test/async-duplicate-end.js

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
const Minipass = require('../')
2+
const t = require('tap')
3+
4+
t.test('async pipes should only end one time', t => {
5+
const m = new Minipass({ async: true })
6+
let ended = 0
7+
const d = new Minipass()
8+
d.end = () => {
9+
ended++
10+
Minipass.prototype.end.call(d)
11+
}
12+
m.pipe(d)
13+
m.end()
14+
m.end()
15+
m.end()
16+
setTimeout(() => {
17+
t.equal(ended, 1)
18+
t.end()
19+
})
20+
})

0 commit comments

Comments
 (0)
Please sign in to comment.