Skip to content

Commit

Permalink
feat: allow reuse of external integrity stream
Browse files Browse the repository at this point in the history
  • Loading branch information
nlf authored and wraithgar committed Jun 1, 2022
1 parent 63e1d51 commit fdb9e5a
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 24 deletions.
39 changes: 21 additions & 18 deletions lib/fetcher.js
Expand Up @@ -18,6 +18,7 @@ const removeTrailingSlashes = require('./util/trailing-slashes.js')
const getContents = require('@npmcli/installed-package-contents')
const readPackageJsonFast = require('read-package-json-fast')
const readPackageJson = promisify(require('read-package-json'))
const Minipass = require('minipass')

// we only change ownership on unix platforms, and only if uid is 0
const selfOwner = process.getuid && process.getuid() === 0 ? {
Expand Down Expand Up @@ -219,40 +220,42 @@ class FetcherBase {
}

[_istream] (stream) {
// everyone will need one of these, either for verifying or calculating
// We always set it, because we have might only have a weak legacy hex
// sha1 in the packument, and this MAY upgrade it to a stronger algo.
// If we had an integrity, and it doesn't match, then this does not
// override that error; the istream will raise the error before it
// gets to the point of re-setting the integrity.
const istream = ssri.integrityStream(this.opts)
istream.on('integrity', i => this.integrity = i)
stream.on('error', er => istream.emit('error', er))

// if not caching this, just pipe through to the istream and return it
// if not caching this, just return it
if (!this.opts.cache || !this[_cacheFetches]) {
// instead of creating a new integrity stream, we only piggyback on the
// provided stream's events
if (stream.hasIntegrityEmitter) {
stream.on('integrity', i => this.integrity = i)
return stream
}

const istream = ssri.integrityStream(this.opts)
istream.on('integrity', i => this.integrity = i)
stream.on('error', err => istream.emit('error', err))
return stream.pipe(istream)
}

// we have to return a stream that gets ALL the data, and proxies errors,
// but then pipe from the original tarball stream into the cache as well.
// To do this without losing any data, and since the cacache put stream
// is not a passthrough, we have to pipe from the original stream into
// the cache AFTER we pipe into the istream. Since the cache stream
// the cache AFTER we pipe into the middleStream. Since the cache stream
// has an asynchronous flush to write its contents to disk, we need to
// defer the istream end until the cache stream ends.
stream.pipe(istream, { end: false })
// defer the middleStream end until the cache stream ends.
const middleStream = new Minipass()
stream.on('error', err => middleStream.emit('error', err))
stream.pipe(middleStream, { end: false })
const cstream = cacache.put.stream(
this.opts.cache,
`pacote:tarball:${this.from}`,
this.opts
)
cstream.on('integrity', i => this.integrity = i)
cstream.on('error', err => stream.emit('error', err))
stream.pipe(cstream)
// defer istream end until after cstream
// cache write errors should not crash the fetch, this is best-effort.
cstream.promise().catch(() => {}).then(() => istream.end())

return istream
cstream.promise().catch(() => {}).then(() => middleStream.end())
return middleStream
}

pickIntegrityAlgorithm () {
Expand Down
17 changes: 11 additions & 6 deletions lib/remote.js
Expand Up @@ -31,23 +31,28 @@ class RemoteFetcher extends Fetcher {

[_tarballFromResolved] () {
const stream = new Minipass()
stream.hasIntegrityEmitter = true

const fetchOpts = {
...this.opts,
headers: this[_headers](),
spec: this.spec,
integrity: this.integrity,
algorithms: [this.pickIntegrityAlgorithm()],
}
fetch(this.resolved, fetchOpts).then(res => {
const hash = res.headers.get('x-local-cache-hash')
if (hash) {
this.integrity = decodeURIComponent(hash)
}

fetch(this.resolved, fetchOpts).then(res => {
res.body.on('error',
/* istanbul ignore next - exceedingly rare and hard to simulate */
er => stream.emit('error', er)
).pipe(stream)
)

res.body.on('integrity', i => {
this.integrity = i
stream.emit('integrity', i)
})

res.body.pipe(stream)
}).catch(er => stream.emit('error', er))

return stream
Expand Down
10 changes: 10 additions & 0 deletions test/remote.js
@@ -1,5 +1,6 @@
const RemoteFetcher = require('../lib/remote.js')
const http = require('http')
const ssri = require('ssri')
const t = require('tap')

const { resolve } = require('path')
Expand All @@ -12,9 +13,11 @@ const fs = require('fs')
const abbrev = resolve(__dirname, 'fixtures/abbrev-1.1.1.tgz')
const port = 12345 + (+process.env.TAP_CHILD_ID || 0)
const server = `http://localhost:${port}`
let abbrevIntegrity
const requestLog = []
t.test('start server', t => {
const data = fs.readFileSync(abbrev)
abbrevIntegrity = ssri.fromData(data)
const httpServer = http.createServer((req, res) => {
res.setHeader('cache-control', 'max-age=432000')
res.setHeader('accept-ranges', 'bytes')
Expand Down Expand Up @@ -112,6 +115,13 @@ t.test('bad integrity', t => {
})
})

t.test('known integrity', async t => {
const url = `${server}/abbrev.tgz`
const f = new RemoteFetcher(url, { cache, integrity: abbrevIntegrity })
await f.extract(me + '/good-integrity')
t.same(f.integrity, abbrevIntegrity, 'got the right integrity back out')
})

t.test('an missing tarball', t => {
const url = `${server}/404`
const f = new RemoteFetcher(url, { cache })
Expand Down

0 comments on commit fdb9e5a

Please sign in to comment.