Skip to content

Commit

Permalink
fix: remove in-memory buffering in favor of full time streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
nlf committed May 18, 2022
1 parent 06cee12 commit ec2db21
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 193 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -160,7 +160,7 @@ The default cache manager also adds the following headers to cached responses:

* `X-Local-Cache`: Path to the cache the content was found in
* `X-Local-Cache-Key`: Unique cache entry key for this response
* `X-Local-Cache-Mode`: Either `stream` or `buffer` to indicate how the response was read from cacache
* `X-Local-Cache-Mode`: Always `stream` to indicate how the response was read from cacache
* `X-Local-Cache-Hash`: Specific integrity hash for the cached entry
* `X-Local-Cache-Status`: One of `miss`, `hit`, `stale`, `revalidated`, `updated`, or `skip` to signal how the response was created
* `X-Local-Cache-Time`: UTCString of the cache insertion time for the entry
Expand Down
114 changes: 29 additions & 85 deletions lib/cache/entry.js
@@ -1,6 +1,5 @@
const { Request, Response } = require('minipass-fetch')
const Minipass = require('minipass')
const MinipassCollect = require('minipass-collect')
const MinipassFlush = require('minipass-flush')
const MinipassPipeline = require('minipass-pipeline')
const cacache = require('cacache')
Expand All @@ -12,10 +11,6 @@ const remote = require('../remote.js')

const hasOwnProperty = (obj, prop) => Object.prototype.hasOwnProperty.call(obj, prop)

// maximum amount of data we will buffer into memory
// if we'll exceed this, we switch to streaming
const MAX_MEM_SIZE = 5 * 1024 * 1024 // 5MB

// allow list for request headers that will be written to the cache index
// note: we will also store any request headers
// that are named in a response's vary header
Expand Down Expand Up @@ -256,13 +251,10 @@ class CacheEntry {
}

const size = this.response.headers.get('content-length')
const fitsInMemory = !!size && Number(size) < MAX_MEM_SIZE
const shouldBuffer = this.options.memoize !== false && fitsInMemory
const cacheOpts = {
algorithms: this.options.algorithms,
metadata: getMetadata(this.request, this.response, this.options),
size,
memoize: fitsInMemory && this.options.memoize,
}

let body = null
Expand All @@ -281,43 +273,22 @@ class CacheEntry {
},
}))

let abortStream, onResume
if (shouldBuffer) {
// if the result fits in memory, use a collect stream to gather
// the response and write it to cacache while also passing it through
// to the user
onResume = () => {
const collector = new MinipassCollect.PassThrough()
abortStream = collector
collector.on('collect', (data) => {
// TODO if the cache write fails, log a warning but return the response anyway
cacache.put(this.options.cachePath, this.key, data, cacheOpts)
.then(cacheWriteResolve, cacheWriteReject)
})
body.unshift(collector)
body.unshift(this.response.body)
}
} else {
// if it does not fit in memory, create a tee stream and use
// that to pipe to both the cache and the user simultaneously
onResume = () => {
const tee = new Minipass()
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts)
abortStream = cacheStream
tee.pipe(cacheStream)
// TODO if the cache write fails, log a warning but return the response anyway
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
body.unshift(tee)
body.unshift(this.response.body)
}
let abortStream
const onResume = () => {
const tee = new Minipass()
const cacheStream = cacache.put.stream(this.options.cachePath, this.key, cacheOpts)
abortStream = cacheStream
tee.pipe(cacheStream)
// TODO if the cache write fails, log a warning but return the response anyway
cacheStream.promise().then(cacheWriteResolve, cacheWriteReject)
body.unshift(tee)
body.unshift(this.response.body)
}

body.once('resume', onResume)
body.once('end', () => body.removeListener('resume', onResume))
this.response.body.on('error', (err) => {
// the abortStream will either be a MinipassCollect if we buffer
// or a cacache write stream, either way be sure to listen for
// errors from the actual response and avoid writing data that we
// listen for errors from the actual response and avoid writing data that we
// know to be invalid to the cache
abortStream.destroy(err)
})
Expand All @@ -331,7 +302,7 @@ class CacheEntry {
// the header anyway
this.response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath))
this.response.headers.set('x-local-cache-key', encodeURIComponent(this.key))
this.response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream')
this.response.headers.set('x-local-cache-mode', 'stream')
this.response.headers.set('x-local-cache-status', status)
this.response.headers.set('x-local-cache-time', new Date().toISOString())
const newResponse = new Response(body, {
Expand All @@ -346,9 +317,6 @@ class CacheEntry {
// use the cached data to create a response and return it
async respond (method, options, status) {
let response
const size = Number(this.response.headers.get('content-length'))
const fitsInMemory = !!size && size < MAX_MEM_SIZE
const shouldBuffer = this.options.memoize !== false && fitsInMemory
if (method === 'HEAD' || [301, 308].includes(this.response.status)) {
// if the request is a HEAD, or the response is a redirect,
// then the metadata in the entry already includes everything
Expand All @@ -358,52 +326,28 @@ class CacheEntry {
// we're responding with a full cached response, so create a body
// that reads from cacache and attach it to a new Response
const body = new Minipass()
const removeOnResume = () => body.removeListener('resume', onResume)
let onResume
if (shouldBuffer) {
onResume = async () => {
removeOnResume()
try {
const content = await cacache.get.byDigest(
const onResume = () => {
const cacheStream = cacache.get.stream.byDigest(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
cacheStream.on('error', async (err) => {
cacheStream.pause()
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
body.end(content)
} catch (err) {
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
}
}
} else {
onResume = () => {
const cacheStream = cacache.get.stream.byDigest(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
cacheStream.on('error', async (err) => {
cacheStream.pause()
if (err.code === 'EINTEGRITY') {
await cacache.rm.content(
this.options.cachePath, this.entry.integrity, { memoize: this.options.memoize }
)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
cacheStream.resume()
})
cacheStream.pipe(body)
}
if (err.code === 'ENOENT' || err.code === 'EINTEGRITY') {
await CacheEntry.invalidate(this.request, this.options)
}
body.emit('error', err)
cacheStream.resume()
})
cacheStream.pipe(body)
}

body.once('resume', onResume)
body.once('end', removeOnResume)
body.once('end', () => body.removeListener('resume', onResume))
response = new Response(body, {
url: this.entry.metadata.url,
counter: options.counter,
Expand All @@ -417,7 +361,7 @@ class CacheEntry {
response.headers.set('x-local-cache', encodeURIComponent(this.options.cachePath))
response.headers.set('x-local-cache-hash', encodeURIComponent(this.entry.integrity))
response.headers.set('x-local-cache-key', encodeURIComponent(this.key))
response.headers.set('x-local-cache-mode', shouldBuffer ? 'buffer' : 'stream')
response.headers.set('x-local-cache-mode', 'stream')
response.headers.set('x-local-cache-status', status)
response.headers.set('x-local-cache-time', new Date(this.entry.metadata.time).toUTCString())
return response
Expand Down
108 changes: 2 additions & 106 deletions test/cache.js
Expand Up @@ -75,7 +75,7 @@ t.test('no match, fetches and replies', async (t) => {
t.equal(res.headers.get('content-length'), `${CONTENT.length}`, 'kept content-length')
t.equal(res.headers.get('x-local-cache'), encodeURIComponent(dir), 'has cache dir')
t.equal(res.headers.get('x-local-cache-key'), encodeURIComponent(reqKey), 'has cache key')
t.equal(res.headers.get('x-local-cache-mode'), 'buffer', 'should buffer store')
t.equal(res.headers.get('x-local-cache-mode'), 'stream', 'should stream store')
t.equal(res.headers.get('x-local-cache-status'), 'miss', 'identifies as cache miss')
t.ok(res.headers.has('x-local-cache-time'), 'has cache time')
t.equal(res.headers.get('x-foo'), 'something', 'original response has all headers')
Expand Down Expand Up @@ -197,7 +197,7 @@ t.test('cache hit, no revalidation', async (t) => {
t.equal(res.headers.get('x-local-cache-status'), 'hit', 'got a cache hit')
t.equal(res.headers.get('x-local-cache-key'), encodeURIComponent(reqKey),
'got the right cache key')
t.equal(res.headers.get('x-local-cache-mode'), 'buffer', 'should buffer read')
t.equal(res.headers.get('x-local-cache-mode'), 'stream', 'should stream read')
t.equal(res.headers.get('x-local-cache-hash'), encodeURIComponent(INTEGRITY),
'has the right hash')
// just make sure x-local-cache-time is set, no need to assert its value
Expand Down Expand Up @@ -1386,42 +1386,6 @@ t.test('EINTEGRITY errors streaming from cache propagate to response body', asyn
t.ok(srv.isDone(), 'req is fulfilled')
})

t.test('EINTEGRITY errors reading from cache propagate to response body', async (t) => {
const srv = nock(HOST)
.get('/test')
.twice()
.reply(200, CONTENT, {
...getHeaders(CONTENT),
etag: '"beefc0ffee"',
})

const dir = t.testdir()
const res = await fetch(`${HOST}/test`, { cachePath: dir })
await res.buffer() // drain it immediately so it stores to the cache

const hexIntegrity = ssri.fromData(CONTENT).hexDigest()
const cachedContent = join(dir, 'content-v2', 'sha512', hexIntegrity.slice(0, 2),
hexIntegrity.slice(2, 4), hexIntegrity.slice(4))
t.ok(fs.existsSync(cachedContent), 'cache file is present')
// delete the real content, and write garbage in its place
fs.unlinkSync(cachedContent)
fs.writeFileSync(cachedContent, 'invalid data', { flag: 'wx' })

const cachedRes = await fetch(`${HOST}/test`, { cachePath: dir })
t.equal(cachedRes.status, 200, 'got a success response')
t.equal(cachedRes.headers.get('x-local-cache-status'), 'hit', 'got a cache hit')
await t.rejects(cachedRes.buffer(), { code: 'EINTEGRITY' }, 'consuming payload rejects')

t.notOk(fs.existsSync(cachedContent), 'cached content was removed')
const verifyRes = await fetch(`${HOST}/test`, { cachePath: dir })
t.equal(verifyRes.status, 200, 'got success status')
t.equal(verifyRes.headers.get('x-local-cache-mode'), 'buffer', 'used a buffer to respond')
t.equal(verifyRes.headers.get('x-local-cache-status'), 'miss',
'cache miss because index was removed')
await verifyRes.buffer()
t.ok(srv.isDone(), 'req has fulfilled')
})

t.test('ENOENT errors streaming from cache propagate to response body', async (t) => {
const desiredSize = 5 * 1024 * 1024 // 5MB, currently hard coded in lib/cache/entry.js
const count = Math.ceil(desiredSize / CONTENT.length) + 1
Expand Down Expand Up @@ -1458,39 +1422,6 @@ t.test('ENOENT errors streaming from cache propagate to response body', async (t
t.ok(srv.isDone(), 'req has fulfilled')
})

t.test('ENOENT errors reading from cache propagate to response body', async (t) => {
const srv = nock(HOST)
.get('/test')
.twice()
.reply(200, CONTENT, {
...getHeaders(CONTENT),
etag: '"beef"',
})

const dir = t.testdir()
const res = await fetch(`${HOST}/test`, { cachePath: dir })
await res.buffer()

const hexIntegrity = ssri.fromData(CONTENT).hexDigest()
const cachedContent = join(dir, 'content-v2', 'sha512', hexIntegrity.slice(0, 2),
hexIntegrity.slice(2, 4), hexIntegrity.slice(4))
t.ok(fs.existsSync(cachedContent), 'cache file is present')
// delete the content entirely
fs.unlinkSync(cachedContent)

const cachedRes = await fetch(`${HOST}/test`, { cachePath: dir })
t.equal(cachedRes.status, 200, 'got a success response')
t.equal(cachedRes.headers.get('x-local-cache-status'), 'hit', 'still returns as a hit')
await t.rejects(cachedRes.buffer(), { code: 'ENOENT' }, 'body rejects with ENOENT')

const verifyRes = await fetch(`${HOST}/test`, { cachePath: dir })
t.equal(verifyRes.headers.get('x-local-cache-status'), 'miss', 'went back to a cache miss')
t.equal(verifyRes.headers.get('x-local-cache-mode'), 'buffer', 'used a buffer to respond')
await verifyRes.buffer()

t.ok(srv.isDone())
})

t.test('generic errors streaming from cache propagate to response body', async (t) => {
const desiredSize = 5 * 1024 * 1024 // 5MB, currently hard coded in lib/cache/entry.js
const count = Math.ceil(desiredSize / CONTENT.length) + 1
Expand Down Expand Up @@ -1531,38 +1462,3 @@ t.test('generic errors streaming from cache propagate to response body', async (
t.equal(cachedRes.headers.get('x-local-cache-mode'), 'stream', 'was a streaming response')
await t.rejects(cachedRes.buffer(), { message: 'broken stream' }, 'consuming payload rejects')
})

t.test('generic errors reading from cache propagate to response body', async (t) => {
const srv = nock(HOST)
.get('/test')
.reply(200, CONTENT, {
...getHeaders(CONTENT),
etag: '"beefc0ffee"',
})

// hijack cacache.get.byDigest
const realGet = cacache.get.byDigest
t.teardown(() => {
cacache.get.byDigest = realGet
})
cacache.get.byDigest = (cachePath, integrity) => {
return Promise.reject(new Error('broken read'))
}

const dir = t.testdir()
const res = await fetch(`${HOST}/test`, { cachePath: dir })
await res.buffer() // drain it immediately so it stores to the cache

t.ok(srv.isDone(), 'req has fulfilled')

const hexIntegrity = ssri.fromData(CONTENT).hexDigest()
const cachedContent = join(dir, 'content-v2', 'sha512', hexIntegrity.slice(0, 2),
hexIntegrity.slice(2, 4), hexIntegrity.slice(4))
t.ok(fs.existsSync(cachedContent), 'cache file is present')

const cachedRes = await fetch(`${HOST}/test`, { cachePath: dir })
t.equal(cachedRes.status, 200, 'got a success response')
t.equal(cachedRes.headers.get('x-local-cache-status'), 'hit', 'got a cache hit')
t.equal(cachedRes.headers.get('x-local-cache-mode'), 'buffer', 'used a buffered response')
await t.rejects(cachedRes.buffer(), { message: 'broken read' }, 'consuming payload rejects')
})
2 changes: 1 addition & 1 deletion test/cacheable-response-invalid-integrity.js
Expand Up @@ -38,5 +38,5 @@ t.test('cacheable request with invalid integrity', async t => {
await t.rejects(res.json(), { code: 'EINTEGRITY' })
t.ok(req.isDone())
const dir = await readdir(cache)
t.same(dir, [], 'did not write to cache')
t.same(dir, ['tmp'], 'did not write to cache, only temp')
})

0 comments on commit ec2db21

Please sign in to comment.