Skip to content

Commit

Permalink
fix: handle errors from file#createReadStream (#615)
Browse files Browse the repository at this point in the history
* fix: handle errors from file#createReadStream

* add tests

* fix

* once-ify complete handler

* lint
  • Loading branch information
stephenplusplus committed Feb 28, 2019
1 parent d10c120 commit c09f7c2
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 3 deletions.
17 changes: 14 additions & 3 deletions src/file.ts
Expand Up @@ -1212,6 +1212,8 @@ class File extends ServiceObject<File> {
return;
}

rawResponseStream.on('error', onComplete);

const headers = rawResponseStream.toJSON().headers;
const isCompressed = headers['content-encoding'] === 'gzip';
const shouldRunValidation = !rangeRequest && (crc32c || md5);
Expand All @@ -1235,21 +1237,24 @@ class File extends ServiceObject<File> {
rawResponseStream.pipe(pumpify.obj(throughStreams));
}

rawResponseStream.on('end', onComplete).pipe(throughStream, {
end: false
});
rawResponseStream.on('error', onComplete)
.on('end', onComplete)
.pipe(throughStream, {end: false});
};

// This is hooked to the `complete` event from the request stream. This is
// our chance to validate the data and let the user know if anything went
// wrong.
let onCompleteCalled = false;
const onComplete = (err: Error|null) => {
if (err) {
onCompleteCalled = true;
throughStream.destroy(err);
return;
}

if (rangeRequest) {
onCompleteCalled = true;
throughStream.end();
return;
}
Expand All @@ -1260,6 +1265,12 @@ class File extends ServiceObject<File> {
return;
}

if (onCompleteCalled) {
return;
}

onCompleteCalled = true;

const hashes = {
crc32c: this.metadata.crc32c,
md5: this.metadata.md5Hash,
Expand Down
179 changes: 179 additions & 0 deletions test/file.ts
Expand Up @@ -61,6 +61,15 @@ const fakePromisify = {
const fsCached = extend(true, {}, fs);
const fakeFs = extend(true, {}, fsCached);

const zlibCached = extend(true, {}, zlib);
let createGunzipOverride: Function|null;
const fakeZlib = extend(true, {}, zlib, {
createGunzip() {
return (createGunzipOverride || zlibCached.createGunzip)
.apply(null, arguments);
},
});

let hashStreamValidationOverride: Function|null;
const hashStreamValidation = require('hash-stream-validation');
function fakeHashStreamValidation() {
Expand Down Expand Up @@ -148,6 +157,7 @@ describe('File', () => {
'hash-stream-validation': fakeHashStreamValidation,
os: fakeOs,
'xdg-basedir': fakeXdgBasedir,
zlib: fakeZlib,
}).File;
});

Expand Down Expand Up @@ -178,6 +188,7 @@ describe('File', () => {
directoryFile = new File(BUCKET, 'directory/file.jpg');
directoryFile.request = util.noop;

createGunzipOverride = null;
handleRespOverride = null;
hashStreamValidationOverride = null;
makeWritableStreamOverride = null;
Expand Down Expand Up @@ -957,6 +968,83 @@ describe('File', () => {
})
.resume();
});

it('should emit errors from the request stream', done => {
const error = new Error('Error.');
const rawResponseStream = through();
// tslint:disable-next-line:no-any
(rawResponseStream as any).toJSON = () => {
return {headers: {}};
};
const requestStream = through();

handleRespOverride =
(err: Error, res: {}, body: {}, callback: Function) => {
callback(null, null, rawResponseStream);
setImmediate(() => {
rawResponseStream.emit('error', error);
});
};

file.requestStream = () => {
setImmediate(() => {
requestStream.emit('response', rawResponseStream);
});
return requestStream;
};

file.createReadStream()
.on('error',
(err: Error) => {
assert.strictEqual(err, error);
done();
})
.resume();
});

it('should not handle both error and end events', done => {
const error = new Error('Error.');
const rawResponseStream = through();
// tslint:disable-next-line:no-any
(rawResponseStream as any).toJSON = () => {
return {headers: {}};
};
const requestStream = through();

handleRespOverride =
(err: Error, res: {}, body: {}, callback: Function) => {
callback(null, null, rawResponseStream);
setImmediate(() => {
rawResponseStream.emit('error', error);
});
};

file.requestStream = () => {
setImmediate(() => {
requestStream.emit('response', rawResponseStream);
});
return requestStream;
};

file.getMetadata = (options: object, callback: Function) => {
callback();
};

let errorReceived = false;
file.createReadStream({validation: false})
.on('error',
(err: Error) => {
errorReceived = true;
assert.strictEqual(err, error);
rawResponseStream.emit('end');
setImmediate(done);
})
.on('end',
() => {
done(new Error('Should not have been called.'));
})
.resume();
});
});
});

Expand Down Expand Up @@ -995,6 +1083,50 @@ describe('File', () => {
})
.resume();
});

it('should emit errors from the gunzip stream', done => {
const error = new Error('Error.');
const createGunzipStream = through();
createGunzipOverride = () => {
setImmediate(() => {
createGunzipStream.emit('error', error);
});
return createGunzipStream;
};
file.createReadStream()
.on('error',
(err: Error) => {
assert.strictEqual(err, error);
done();
})
.resume();
});

it('should not handle both error and end events', done => {
const error = new Error('Error.');
const createGunzipStream = through();
createGunzipOverride = () => {
setImmediate(() => {
createGunzipStream.emit('error', error);
});
return createGunzipStream;
};
file.getMetadata = (options: object, callback: Function) => {
callback();
};
file.createReadStream({validation: false})
.on('error',
(err: Error) => {
assert.strictEqual(err, error);
createGunzipStream.emit('end');
setImmediate(done);
})
.on('end',
() => {
done(new Error('Should not have been called.'));
})
.resume();
});
});

describe('validation', () => {
Expand Down Expand Up @@ -1022,6 +1154,53 @@ describe('File', () => {
};
});

it('should emit errors from the validation stream', done => {
const error = new Error('Error.');

hashStreamValidationOverride = () => {
setImmediate(() => {
fakeValidationStream.emit('error', error);
});
return fakeValidationStream;
};

file.requestStream = getFakeSuccessfulRequest(data);

file.createReadStream()
.on('error',
(err: Error) => {
assert.strictEqual(err, error);
done();
})
.resume();
});

it('should not handle both error and end events', done => {
const error = new Error('Error.');

hashStreamValidationOverride = () => {
setImmediate(() => {
fakeValidationStream.emit('error', error);
});
return fakeValidationStream;
};

file.requestStream = getFakeSuccessfulRequest(data);

file.createReadStream()
.on('error',
(err: Error) => {
assert.strictEqual(err, error);
fakeValidationStream.emit('end');
setImmediate(done);
})
.on('end',
() => {
done(new Error('Should not have been called.'));
})
.resume();
});

it('should pass the userProject to getMetadata', done => {
const fakeOptions = {
userProject: 'grapce-spaceship-123',
Expand Down

0 comments on commit c09f7c2

Please sign in to comment.