Skip to content

Commit 9c5113e

Browse files
committedJun 20, 2022
feature: add unpipe(dest) method
PR-URL: #39 Credit: @isaacs Close: #39 Reviewed-by: @isaacs
1 parent 547db29 commit 9c5113e

File tree

3 files changed

+70
-7
lines changed

3 files changed

+70
-7
lines changed
 

‎README.md

+10-6
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,6 @@ from this stream via `'data'` events or by calling `pipe()` into some other
1717
stream. Calling `read()` requires the buffer to be flattened in some
1818
cases, which requires copying memory.
1919

20-
There is also no `unpipe()` method. Once you start piping, there is no
21-
stopping it!
22-
2320
If you set `objectMode: true` in the options, then whatever is written will
2421
be emitted. Otherwise, it'll do a minimal amount of Buffer copying to
2522
ensure proper Streams semantics when `read(n)` is called.
@@ -383,6 +380,10 @@ mp.end('bar')
383380
by default if you write() something other than a string or Buffer at any
384381
point. Setting `objectMode: true` will prevent setting any encoding
385382
value.
383+
* `async` Defaults to `false`. Set to `true` to defer data
384+
emission until next tick. This reduces performance slightly,
385+
but makes Minipass streams use timing behavior closer to Node
386+
core streams. See [Timing](#timing) for more details.
386387

387388
### API
388389

@@ -404,9 +405,12 @@ streams.
404405
from being emitted for empty streams until the stream is resumed.
405406
* `resume()` - Resume the stream. If there's data in the buffer, it is all
406407
discarded. Any buffered events are immediately emitted.
407-
* `pipe(dest)` - Send all output to the stream provided. There is no way
408-
to unpipe. When data is emitted, it is immediately written to any and
409-
all pipe destinations.
408+
* `pipe(dest)` - Send all output to the stream provided. When
409+
data is emitted, it is immediately written to any and all pipe
410+
destinations. (Or written on next tick in `async` mode.)
411+
* `unpipe(dest)` - Stop piping to the destination stream. This
412+
is immediate, meaning that any asynchronously queued data will
413+
_not_ make it to the destination when running in `async` mode.
410414
* `on(ev, fn)`, `emit(ev, fn)` - Minipass streams are EventEmitters. Some
411415
events are given special treatment, however. (See below under "events".)
412416
* `promise()` - Returns a Promise that resolves when the stream emits

‎index.js

+12-1
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,13 @@ class Pipe {
6363
this.ondrain = () => src[RESUME]()
6464
dest.on('drain', this.ondrain)
6565
}
66+
unpipe () {
67+
this.dest.removeListener('drain', this.ondrain)
68+
}
6669
end () {
70+
this.unpipe()
6771
if (this.opts.end)
6872
this.dest.end()
69-
this.dest.removeListener('drain', this.ondrain)
7073
}
7174
}
7275

@@ -377,6 +380,14 @@ module.exports = class Minipass extends Stream {
377380
return dest
378381
}
379382

383+
unpipe (dest) {
384+
const p = this.pipes.find(p => p.dest === dest)
385+
if (p) {
386+
this.pipes.splice(this.pipes.indexOf(p), 1)
387+
p.unpipe()
388+
}
389+
}
390+
380391
addListener (ev, fn) {
381392
return this.on(ev, fn)
382393
}

‎test/unpipe.js

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
const t = require('tap')
2+
const Minipass = require('../')
3+
4+
const src = new Minipass({ encoding: 'utf8' })
5+
const dest = new Minipass({ encoding: 'utf8' })
6+
const dest2 = new Minipass({ encoding: 'utf8' })
7+
const destOut = []
8+
const dest2Out = []
9+
const srcOut = []
10+
11+
src.pipe(dest)
12+
src.pipe(dest2)
13+
14+
dest.on('data', c => destOut.push(c))
15+
dest2.on('data', c => dest2Out.push(c))
16+
src.on('data', c => srcOut.push(c))
17+
18+
src.write('hello')
19+
t.strictSame(destOut, ['hello'])
20+
t.strictSame(dest2Out, ['hello'])
21+
t.strictSame(srcOut, ['hello'])
22+
23+
t.match(src.pipes, [
24+
{ dest },
25+
{ dest: dest2 },
26+
])
27+
28+
src.unpipe(dest)
29+
30+
t.match(src.pipes, [
31+
{ dest: dest2 },
32+
])
33+
34+
src.unpipe(dest) // no-op
35+
t.match(src.pipes, [
36+
{ dest: dest2 },
37+
])
38+
39+
40+
src.write('world')
41+
t.strictSame(destOut, ['hello'])
42+
t.strictSame(dest2Out, ['hello', 'world'])
43+
t.strictSame(srcOut, ['hello', 'world'])
44+
45+
src.end()
46+
t.equal(dest.emittedEnd, false)
47+
t.equal(src.emittedEnd, true)
48+
t.equal(dest2.emittedEnd, true)

0 commit comments

Comments
 (0)
Please sign in to comment.