Skip to content

Commit

Permalink
WriteEntry backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
isaacs committed Aug 3, 2021
1 parent ba73f5e commit be89aaf
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 7 deletions.
28 changes: 22 additions & 6 deletions lib/write-entry.js
Expand Up @@ -21,6 +21,8 @@ const OPENFILE = Symbol('openfile')
const ONOPENFILE = Symbol('onopenfile')
const CLOSE = Symbol('close')
const MODE = Symbol('mode')
const AWAITDRAIN = Symbol('awaitDrain')
const ONDRAIN = Symbol('ondrain')
const warner = require('./warn-mixin.js')
const winchars = require('./winchars.js')
const stripAbsolutePath = require('./strip-absolute-path.js')
Expand Down Expand Up @@ -232,7 +234,7 @@ const WriteEntry = warner(class WriteEntry extends MiniPass {
this.pos = 0
this.remain = this.stat.size
this.length = this.buf.length
this[READ](this.stat.size)
this[READ]()
}

[READ] () {
Expand Down Expand Up @@ -284,13 +286,23 @@ const WriteEntry = warner(class WriteEntry extends MiniPass {

const writeBuf = this.offset === 0 && bytesRead === this.buf.length ?
this.buf : this.buf.slice(this.offset, this.offset + bytesRead)
this.remain -= bytesRead
this.blockRemain -= bytesRead
this.pos += bytesRead
this.offset += bytesRead
this.remain -= writeBuf.length
this.blockRemain -= writeBuf.length
this.pos += writeBuf.length
this.offset += writeBuf.length

const flushed = this.write(writeBuf)
if (!flushed)
this[AWAITDRAIN](() => this[ONDRAIN]())
else
this[ONDRAIN]()
}

this.write(writeBuf)
[AWAITDRAIN] (cb) {
this.once('drain', cb)
}

[ONDRAIN] () {
if (!this.remain) {
if (this.blockRemain)
this.write(Buffer.alloc(this.blockRemain))
Expand Down Expand Up @@ -339,6 +351,10 @@ class WriteEntrySync extends WriteEntry {
}
}

[AWAITDRAIN] (cb) {
cb()
}

[CLOSE] (cb) {
fs.closeSync(this.fd)
cb()
Expand Down
3 changes: 2 additions & 1 deletion test/write-entry.js
Expand Up @@ -254,6 +254,7 @@ t.test('zero-byte file, but close fails', t => {
t.match(er, { message: 'poop' })
t.end()
})
ws.resume()
})

t.test('hardlinks', t => {
Expand Down Expand Up @@ -597,7 +598,7 @@ t.test('read overflow expectation', t => {
t.throws(_ => new WriteEntry.Sync(f, { cwd: files, maxReadSize: 2 }), expect)
new WriteEntry(f, { cwd: files, maxReadSize: 2 }).on('error', er => {
t.match(er, expect)
})
}).resume()
})

t.test('short reads', t => {
Expand Down

0 comments on commit be89aaf

Please sign in to comment.